Mongodb聚合$ group,限制数组的长度

时间:2022-03-25 17:43:59

I want to group all the documents according to a field but to restrict the number of documents grouped for each value.

我想根据字段对所有文档进行分组,但是要限制为每个值分组的文档数。

Each message has a conversation_ID. I need to get 10 or lesser number of messages for each conversation_ID.

每条消息都有一个conversation_ID。我需要为每个conversation_ID获取10个或更少数量的消息。

I am able to group according to the following command but can't figure out how to restrict the number of grouped documents apart from slicing the results Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}})

我可以根据以下命令进行分组,但无法弄清楚如何限制分组文档的数量而不是切片结果Message.aggregate({'$ group':{_ id:'$ conversation_ID',msgs:{ '$推':{MSGID: '$ _ ID'}}}})

How to limit the length of msgs array for each conversation_ID to 10?

如何将每个conversation_ID的msgs数组的长度限制为10?

2 个解决方案

#1


14  

Modern

From MongoDB 3.6 there is a "novel" approach to this by using $lookup to perform a "self join" in much the same way as the original cursor processing demonstrated below.

从MongoDB 3.6开始,通过使用$ lookup来执行“自联接”,有一种“新颖”的方法,其方式与下面演示的原始游标处理非常相似。

Since in this release you can specify a "pipeline" argument to $lookup as a source for the "join", this essentially means you can use $match and $limit to gather and "limit" the entries for the array:

因为在这个版本中你可以为$ lookup指定一个“管道”参数作为“join”的源,这实际上意味着你可以使用$ match和$ limit来收集和“限制”数组的条目:

db.messages.aggregate([
  { "$group": { "_id": "$conversation_ID" } },
  { "$lookup": {
    "from": "messages",
    "let": { "conversation": "$_id" },
    "pipeline": [
      { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
      { "$limit": 10 },
      { "$project": { "_id": 1 } }
    ],
    "as": "msgs"
  }}
])

You can optionally add additional projection after the $lookup in order to make the array items simply the values rather than documents with an _id key, but the basic result is there by simply doing the above.

你可以选择在$ lookup之后添加额外的投影,以使数组项只是值而不是带有_id键的文档,但基本结果是通过简单地执行上述操作。

There is still the outstanding SERVER-9277 which actually requests a "limit to push" directly, but using $lookup in this way is a viable alternative in the interim.

仍有优秀的SERVER-9277实际上直接请求“限制推送”,但在此期间使用$ lookup是一种可行的替代方案。

NOTE: There also is $slice which was introduced after writing the original answer and mentioned by "outstanding JIRA issue" in the original content. Whilst you can get the same result with small result sets, it does involve still "pushing everything" into the array and then later limiting the final array output to the desired length.

注意:在原始内容中写了原始答案并在“杰出的JIRA问题”中提到后,还有$ slice。虽然您可以使用较小的结果集获得相同的结果,但它确实涉及仍然“将所有内容”推入数组中,然后将最终数组输出限制为所需的长度。

So that's the main distinction and why it's generally not practical to $slice for large results. But of course can be alternately used in cases where it is.

所以这是主要区别以及为什么对于大结果来说,切片通常是不切实际的。但当然可以在它的情况下交替使用。

There are a few more details on mongodb group values by multiple fields about either alternate usage.

有关mongodb组值的更多详细信息,请参阅多个字段,以了解其他用法。


Original

As stated earlier, this is not impossible but certainly a horrible problem.

如前所述,这不是不可能的,但肯定是一个可怕的问题。

Actually if your main concern is that your resulting arrays are going to be exceptionally large, then you best approach is to submit for each distinct "conversation_ID" as an individual query and then combine your results. In very MongoDB 2.6 syntax which might need some tweaking depending on what your language implementation actually is:

实际上,如果你主要担心的是你的结果数组会非常大,那么你最好的方法是将每个不同的“conversation_ID”作为单独的查询提交,然后结合你的结果。在非常MongoDB 2.6语法中,可能需要进行一些调整,具体取决于您的语言实现是什么:

var results = [];
db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID"
    }}
]).forEach(function(doc) {
    db.messages.aggregate([
        { "$match": { "conversation_ID": doc._id } },
        { "$limit": 10 },
        { "$group": {
            "_id": "$conversation_ID",
            "msgs": { "$push": "$_id" }
        }}
    ]).forEach(function(res) {
        results.push( res );
    });
});

But it all depends on whether that is what you are trying to avoid. So on to the real answer:

但这一切都取决于你是否想要避免这种情况。那么真正的答案:


The first issue here is that there is no function to "limit" the number of items that are "pushed" into an array. It is certainly something we would like, but the functionality does not presently exist.

这里的第一个问题是没有“限制”被“推”到数组中的项目数的功能。这当然是我们想要的,但功能目前还不存在。

The second issue is that even when pushing all items into an array, you cannot use $slice, or any similar operator in the aggregation pipeline. So there is no present way to get just the "top 10" results from a produced array with a simple operation.

第二个问题是即使将所有项目推送到数组中,也不能在聚合管道中使用$ slice或任何类似的运算符。因此,目前没有办法通过简单的操作从生成的数组中获得“前10名”结果。

But you can actually produce a set of operations to effectively "slice" on your grouping boundaries. It is fairly involved, and for example here I will reduce the array elements "sliced" to "six" only. The main reason here is to demonstrate the process and show how to do this without being destructive with arrays that do not contain the total you want to "slice" to.

但实际上,您可以生成一组操作来有效地“分割”您的分组边界。这是相当复杂的,例如在这里我将把数组元素“切成”仅减少到“六”。这里的主要原因是演示该过程并展示如何执行此操作,而不会破坏不包含您要“切片”的总数的数组。

Given a sample of documents:

给出一份文件样本:

{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }

You can see there that when grouping by your conditions you will get one array with ten elements and another with "five". What you want to do here reduce both to the top "six" without "destroying" the array that only will match to "five" elements.

你可以看到,根据你的条件进行分组,你会得到一个包含十个元素的数组,另一个包含“五个”的数组。你想要做的是将两者都减少到顶部的“六”而不“破坏”只与“五”元素匹配的数组。

And the following query:

以下查询:

db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID",
        "first": { "$first": "$_id" },
        "msgs": { "$push": "$_id" },
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "seen": { "$eq": [ "$first", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "seen": { "$eq": [ "$second", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "seen": { "$eq": [ "$third", "$msgs" ] },
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "seen": { "$eq": [ "$forth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "fifth": 1,
        "seen": { "$eq": [ "$fifth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$fifth" },
        "sixth": { "$first": "$msgs" },
    }},
    { "$project": {
         "first": 1,
         "second": 1,
         "third": 1,
         "forth": 1,
         "fifth": 1,
         "sixth": 1,
         "pos": { "$const": [ 1,2,3,4,5,6 ] }
    }},
    { "$unwind": "$pos" },
    { "$group": {
        "_id": "$_id",
        "msgs": {
            "$push": {
                "$cond": [
                    { "$eq": [ "$pos", 1 ] },
                    "$first",
                    { "$cond": [
                        { "$eq": [ "$pos", 2 ] },
                        "$second",
                        { "$cond": [
                            { "$eq": [ "$pos", 3 ] },
                            "$third",
                            { "$cond": [
                                { "$eq": [ "$pos", 4 ] },
                                "$forth",
                                { "$cond": [
                                    { "$eq": [ "$pos", 5 ] },
                                    "$fifth",
                                    { "$cond": [
                                        { "$eq": [ "$pos", 6 ] },
                                        "$sixth",
                                        false
                                    ]}
                                ]}
                            ]}
                        ]}
                    ]}
                ]
            }
        }
    }},
    { "$unwind": "$msgs" },
    { "$match": { "msgs": { "$ne": false } }},
    { "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }}
])

You get the top results in the array, up to six entries:

您将获得阵列中的最佳结果,最多六个条目:

{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }

As you can see here, loads of fun.

正如你在这里看到的,充满乐趣。

After you have initially grouped you basically want to "pop" the $first value off of the stack for the array results. To make this process simplified a little, we actually do this in the initial operation. So the process becomes:

在最初分组之后,您基本上希望从阵列结果的“堆栈”中“弹出”$ first值。为了简化这个过程,我们实际上是在初始操作中执行此操作。所以过程变成:

  • $unwind the array
  • $展开数组
  • Compare to the values already seen with an $eq equality match
  • 与已经看到的$ eq相等匹配的值相比较
  • $sort the results to "float" false unseen values to the top ( this still retains order )
  • $将结果排序为“浮动”错误看不见的值到顶部(这仍然保留订单)
  • $group back again and "pop" the $first unseen value as the next member on the stack. Also this uses the $cond operator to replace "seen" values in the array stack with false to help in the evaluation.
  • $ group再次返回并“弹出”第一个看不见的值作为堆栈中的下一个成员。此外,这使用$ cond运算符将数组堆栈中的“see”值替换为false以帮助进行评估。

The final action with $cond is there to make sure that future iterations are not just adding the last value of the array over and over where the "slice" count is greater than the array members.

使用$ cond的最终操作是为了确保将来的迭代不只是在“slice”计数大于数组成员的地方反复添加数组的最后一个值。

That whole process needs to be repeated for as many items as you wish to "slice". Since we already found the "first" item in the initial grouping, that means n-1 iterations for the desired slice result.

整个过程需要针对您想要“切片”的多个项目重复进行。由于我们已经在初始分组中找到了“第一”项,这意味着所需切片结果的n-1次迭代。

The final steps are really just an optional illustration of converting everything back into arrays for the result as finally shown. So really just conditionally pushing items or false back by their matching position and finally "filtering" out all the false values so the end arrays have "six" and "five" members respectively.

最后的步骤实际上只是将所有内容转换回数组的可选示例,最终显示结果。因此,实际上只是通过匹配位置有条件地推送项目或错误返回,最后“过滤”所有错误值,以便结束数组分别具有“六”和“五”成员。

So there is not a standard operator to accommodate this, and you cannot just "limit" the push to 5 or 10 or whatever items in the array. But if you really have to do it, then this is your best approach.

所以没有一个标准的运算符来容纳这个,你不能只将“限制”限制为5或10或阵列中的任何项目。但如果你真的必须这样做,那么这是你最好的方法。


You could possibly approach this with mapReduce and forsake the aggregation framework all together. The approach I would take ( within reasonable limits ) would be to effectively have an in-memory hash-map on the server and accumulate arrays to that, while using JavaScript slice to "limit" the results:

您可以使用mapReduce来解决这个问题,并将聚合框架放在一起。我将采取的方法(在合理的限制范围内)将有效地在服务器上具有内存中的哈希映射并将数组累积到该数据库,同时使用JavaScript切片来“限制”结果:

db.messages.mapReduce(
    function () {

        if ( !stash.hasOwnProperty(this.conversation_ID) ) {
            stash[this.conversation_ID] = [];
        }

        if ( stash[this.conversation_ID.length < maxLen ) {
            stash[this.conversation_ID].push( this._id );
            emit( this.conversation_ID, 1 );
        }

    },
    function(key,values) {
        return 1;   // really just want to keep the keys
    },
    { 
        "scope": { "stash": {}, "maxLen": 10 },
        "finalize": function(key,value) {
            return { "msgs": stash[key] };                
        },
        "out": { "inline": 1 }
    }
)

So that just basically builds up the "in-memory" object matching the emitted "keys" with an array never exceeding the maximum size you want to fetch from your results. Additionally this does not even bother to "emit" the item when the maximum stack is met.

因此,基本上构建“内存中”对象,匹配发出的“键”,数组永远不会超过您想从结果中获取的最大大小。此外,当满足最大堆栈时,这甚至不会“发出”该项目。

The reduce part actually does nothing other than essentially just reduce to "key" and a single value. So just in case our reducer did not get called, as would be true if only 1 value existed for a key, the finalize function takes care of mapping the "stash" keys to the final output.

除了基本上只减少到“关键”和单个值之外,reduce部分实际上什么都不做。因此,万一我们的reducer没有被调用,如果一个键只存在1个值,那么finalize函数负责将“stash”键映射到最终输出。

The effectiveness of this varies on the size of the output, and JavaScript evaluation is certainly not fast, but possibly faster than processing large arrays in a pipeline.

其有效性因输出的大小而异,JavaScript评估肯定不会很快,但可能比在管道中处理大型数组更快。


Vote up the JIRA issues to actually have a "slice" operator or even a "limit" on "$push" and "$addToSet", which would both be handy. Personally hoping that at least some modification can be made to the $map operator to expose the "current index" value when processing. That would effectively allow "slicing" and other operations.

投入JIRA问题实际上有一个“切片”操作符,甚至是“$ push”和“$ addToSet”的“限制”,这两者都很方便。个人希望至少可以对$ map运算符进行一些修改,以便在处理时公开“当前索引”值。这将有效地允许“切片”和其他操作。

Really you would want to code this up to "generate" all of the required iterations. If the answer here gets enough love and/or other time pending that I have in tuits, then I might add some code to demonstrate how to do this. It is already a reasonably long response.

实际上,您可能希望对此进行编码以“生成”所有必需的迭代。如果这里的答案得到足够的爱和/或其他时间待定,那么我可能会添加一些代码来演示如何执行此操作。这已经是一个相当长的反应。


Code to generate pipeline:

生成管道的代码:

var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;

var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": []  } } };

for ( var x = 1; x <= maxLen; x++ ) {

    fproj["$project"][""+x] = 1;
    fproj["$project"]["pos"]["$const"].push( x );

    var rec = {
        "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
    };
    if ( stack.length == 0 ) {
        rec["$cond"].push( false );
    } else {
        lval = stack.pop();
        rec["$cond"].push( lval );
    }

    stack.push( rec );

    if ( x == 1) {
        pipe.push({ "$group": {
           "_id": key,
           "1": { "$first": val },
           "msgs": { "$push": val }
        }});
    } else {
        pipe.push({ "$unwind": "$msgs" });
        var proj = {
            "$project": {
                "msgs": 1
            }
        };

        proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };

        var grp = {
            "$group": {
                "_id": "$_id",
                "msgs": {
                    "$push": {
                        "$cond": [ { "$not": "$seen" }, "$msgs", false ]
                    }
                }
            }
        };

        for ( n=x; n >= 1; n-- ) {
            if ( n != x ) 
                proj["$project"][""+n] = 1;
            grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
        }

        pipe.push( proj );
        pipe.push({ "$sort": { "seen": 1 } });
        pipe.push(grp);
    }
}

pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": stack[0] }
    }
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }
}); 

That builds the basic iterative approach up to maxLen with the steps from $unwind to $group. Also embedded in there are details of the final projections required and the "nested" conditional statement. The last is basically the approach taken on this question:

这将构建基本的迭代方法,最多可达maxLen,步骤从$ unwind到$ group。还嵌入了所需的最终投影和“嵌套”条件语句的详细信息。最后一个基本上是针对这个问题的方法:

Does MongoDB's $in clause guarantee order?

MongoDB的$ in子句是否保证订单?

#2


0  

The $slice operator is not an aggregation operator so you can't do this (like I suggested in this answer, before the edit):

$ slice运算符不是聚合运算符,因此您无法执行此操作(就像我在此答案中建议的那样,在编辑之前):

db.messages.aggregate([
   { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
   { $project : { _id : 1, msgs : { $slice : 10 }}}]);

Neil's answer is very detailed, but you can use a slightly different approach (if it fits your use case). You can aggregate your results and output them to a new collection:

Neil的答案非常详细,但您可以使用稍微不同的方法(如果它适合您的使用案例)。您可以汇总结果并将其输出到新集合:

db.messages.aggregate([
   { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
   { $out : "msgs_agg" }
]);

The $out operator will write the results of the aggregation to a new collection. You can then use a regular find query project your results with the $slice operator:

$ out运算符会将聚合的结果写入新集合。然后,您可以使用$ slice运算符使用常规查找查询项目结果:

db.msgs_agg.find({}, { msgs : { $slice : 10 }});

For this test documents:

对于此测试文档:

> db.messages.find().pretty();
{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 1234 }
{ "_id" : 8, "conversation_ID" : 1234 }
{ "_id" : 9, "conversation_ID" : 1234 }

The result will be:

结果将是:

> db.msgs_agg.find({}, { msgs : { $slice : 10 }});
{ "_id" : 1234, "msgs" : [ { "msgid" : 7 }, { "msgid" : 8 }, { "msgid" : 9 } ] }
{ "_id" : 123, "msgs" : [ { "msgid" : 1 }, { "msgid" : 2 }, { "msgid" : 3 }, 
                          { "msgid" : 4 }, { "msgid" : 5 } ] }

Edit

编辑

I assume this would mean duplicating the whole messages collection. Isn't that overkill?

我认为这意味着重复整个消息集合。这不是矫枉过正吗?

Well, obviously this approach won't scale with huge collections. But, since you're considering using large aggregation pipelines or large map-reduce jobs you probably won't use this for "real-time" requests.

嗯,显然这种方法不会随着大量收藏而扩展。但是,由于您正在考虑使用大型聚合管道或大型map-reduce作业,因此您可能不会将其用于“实时”请求。

There are many cons of this approach: 16 MB BSON limit if you're creating huge documents with aggregation, wasting disk space / memory with duplication, increased disk IO...

这种方法有很多缺点:如果您使用聚合创建大型文档,浪费磁盘空间/内存并复制,增加磁盘IO,则需要16 MB BSON限制...

The pros of this approach: its simple to implement and thus easy to change. If your collection is rarely updated you can use this "out" collection like a cache. This way you wouldn't have to perform the aggregation operation multiple times and you could then even support "real-time" client requests on the "out" collection. To refresh your data, you can periodically do aggregation (e.g. in a background job that runs nightly).

这种方法的优点:它易于实现,因此易于更改。如果你的集合很少更新,你可以像缓存一样使用这个“out”集合。这样您就不必多次执行聚合操作,甚至可以在“out”集合上支持“实时”客户端请求。要刷新数据,您可以定期进行聚合(例如,在每晚运行的后台作业中)。

Like it was said in the comments this isn't an easy problem and there isn't a perfect solution for this (yet!). I showed you another approach you can use, it's up to you to benchmark and decide what's most appropriate for your use case.

就像在评论中所说的那样,这不是一个简单的问题,并且没有一个完美的解决方案(还有!)。我向您展示了另一种可以使用的方法,由您决定基准并决定哪种方法最适合您的用例。

#1


14  

Modern

From MongoDB 3.6 there is a "novel" approach to this by using $lookup to perform a "self join" in much the same way as the original cursor processing demonstrated below.

从MongoDB 3.6开始,通过使用$ lookup来执行“自联接”,有一种“新颖”的方法,其方式与下面演示的原始游标处理非常相似。

Since in this release you can specify a "pipeline" argument to $lookup as a source for the "join", this essentially means you can use $match and $limit to gather and "limit" the entries for the array:

因为在这个版本中你可以为$ lookup指定一个“管道”参数作为“join”的源,这实际上意味着你可以使用$ match和$ limit来收集和“限制”数组的条目:

db.messages.aggregate([
  { "$group": { "_id": "$conversation_ID" } },
  { "$lookup": {
    "from": "messages",
    "let": { "conversation": "$_id" },
    "pipeline": [
      { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
      { "$limit": 10 },
      { "$project": { "_id": 1 } }
    ],
    "as": "msgs"
  }}
])

You can optionally add additional projection after the $lookup in order to make the array items simply the values rather than documents with an _id key, but the basic result is there by simply doing the above.

你可以选择在$ lookup之后添加额外的投影,以使数组项只是值而不是带有_id键的文档,但基本结果是通过简单地执行上述操作。

There is still the outstanding SERVER-9277 which actually requests a "limit to push" directly, but using $lookup in this way is a viable alternative in the interim.

仍有优秀的SERVER-9277实际上直接请求“限制推送”,但在此期间使用$ lookup是一种可行的替代方案。

NOTE: There also is $slice which was introduced after writing the original answer and mentioned by "outstanding JIRA issue" in the original content. Whilst you can get the same result with small result sets, it does involve still "pushing everything" into the array and then later limiting the final array output to the desired length.

注意:在原始内容中写了原始答案并在“杰出的JIRA问题”中提到后,还有$ slice。虽然您可以使用较小的结果集获得相同的结果,但它确实涉及仍然“将所有内容”推入数组中,然后将最终数组输出限制为所需的长度。

So that's the main distinction and why it's generally not practical to $slice for large results. But of course can be alternately used in cases where it is.

所以这是主要区别以及为什么对于大结果来说,切片通常是不切实际的。但当然可以在它的情况下交替使用。

There are a few more details on mongodb group values by multiple fields about either alternate usage.

有关mongodb组值的更多详细信息,请参阅多个字段,以了解其他用法。


Original

As stated earlier, this is not impossible but certainly a horrible problem.

如前所述,这不是不可能的,但肯定是一个可怕的问题。

Actually if your main concern is that your resulting arrays are going to be exceptionally large, then you best approach is to submit for each distinct "conversation_ID" as an individual query and then combine your results. In very MongoDB 2.6 syntax which might need some tweaking depending on what your language implementation actually is:

实际上,如果你主要担心的是你的结果数组会非常大,那么你最好的方法是将每个不同的“conversation_ID”作为单独的查询提交,然后结合你的结果。在非常MongoDB 2.6语法中,可能需要进行一些调整,具体取决于您的语言实现是什么:

var results = [];
db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID"
    }}
]).forEach(function(doc) {
    db.messages.aggregate([
        { "$match": { "conversation_ID": doc._id } },
        { "$limit": 10 },
        { "$group": {
            "_id": "$conversation_ID",
            "msgs": { "$push": "$_id" }
        }}
    ]).forEach(function(res) {
        results.push( res );
    });
});

But it all depends on whether that is what you are trying to avoid. So on to the real answer:

但这一切都取决于你是否想要避免这种情况。那么真正的答案:


The first issue here is that there is no function to "limit" the number of items that are "pushed" into an array. It is certainly something we would like, but the functionality does not presently exist.

这里的第一个问题是没有“限制”被“推”到数组中的项目数的功能。这当然是我们想要的,但功能目前还不存在。

The second issue is that even when pushing all items into an array, you cannot use $slice, or any similar operator in the aggregation pipeline. So there is no present way to get just the "top 10" results from a produced array with a simple operation.

第二个问题是即使将所有项目推送到数组中,也不能在聚合管道中使用$ slice或任何类似的运算符。因此,目前没有办法通过简单的操作从生成的数组中获得“前10名”结果。

But you can actually produce a set of operations to effectively "slice" on your grouping boundaries. It is fairly involved, and for example here I will reduce the array elements "sliced" to "six" only. The main reason here is to demonstrate the process and show how to do this without being destructive with arrays that do not contain the total you want to "slice" to.

但实际上,您可以生成一组操作来有效地“分割”您的分组边界。这是相当复杂的,例如在这里我将把数组元素“切成”仅减少到“六”。这里的主要原因是演示该过程并展示如何执行此操作,而不会破坏不包含您要“切片”的总数的数组。

Given a sample of documents:

给出一份文件样本:

{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }

You can see there that when grouping by your conditions you will get one array with ten elements and another with "five". What you want to do here reduce both to the top "six" without "destroying" the array that only will match to "five" elements.

你可以看到,根据你的条件进行分组,你会得到一个包含十个元素的数组,另一个包含“五个”的数组。你想要做的是将两者都减少到顶部的“六”而不“破坏”只与“五”元素匹配的数组。

And the following query:

以下查询:

db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID",
        "first": { "$first": "$_id" },
        "msgs": { "$push": "$_id" },
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "seen": { "$eq": [ "$first", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "seen": { "$eq": [ "$second", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "seen": { "$eq": [ "$third", "$msgs" ] },
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "seen": { "$eq": [ "$forth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "fifth": 1,
        "seen": { "$eq": [ "$fifth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$fifth" },
        "sixth": { "$first": "$msgs" },
    }},
    { "$project": {
         "first": 1,
         "second": 1,
         "third": 1,
         "forth": 1,
         "fifth": 1,
         "sixth": 1,
         "pos": { "$const": [ 1,2,3,4,5,6 ] }
    }},
    { "$unwind": "$pos" },
    { "$group": {
        "_id": "$_id",
        "msgs": {
            "$push": {
                "$cond": [
                    { "$eq": [ "$pos", 1 ] },
                    "$first",
                    { "$cond": [
                        { "$eq": [ "$pos", 2 ] },
                        "$second",
                        { "$cond": [
                            { "$eq": [ "$pos", 3 ] },
                            "$third",
                            { "$cond": [
                                { "$eq": [ "$pos", 4 ] },
                                "$forth",
                                { "$cond": [
                                    { "$eq": [ "$pos", 5 ] },
                                    "$fifth",
                                    { "$cond": [
                                        { "$eq": [ "$pos", 6 ] },
                                        "$sixth",
                                        false
                                    ]}
                                ]}
                            ]}
                        ]}
                    ]}
                ]
            }
        }
    }},
    { "$unwind": "$msgs" },
    { "$match": { "msgs": { "$ne": false } }},
    { "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }}
])

You get the top results in the array, up to six entries:

您将获得阵列中的最佳结果,最多六个条目:

{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }

As you can see here, loads of fun.

正如你在这里看到的,充满乐趣。

After you have initially grouped you basically want to "pop" the $first value off of the stack for the array results. To make this process simplified a little, we actually do this in the initial operation. So the process becomes:

在最初分组之后,您基本上希望从阵列结果的“堆栈”中“弹出”$ first值。为了简化这个过程,我们实际上是在初始操作中执行此操作。所以过程变成:

  • $unwind the array
  • $展开数组
  • Compare to the values already seen with an $eq equality match
  • 与已经看到的$ eq相等匹配的值相比较
  • $sort the results to "float" false unseen values to the top ( this still retains order )
  • $将结果排序为“浮动”错误看不见的值到顶部(这仍然保留订单)
  • $group back again and "pop" the $first unseen value as the next member on the stack. Also this uses the $cond operator to replace "seen" values in the array stack with false to help in the evaluation.
  • $ group再次返回并“弹出”第一个看不见的值作为堆栈中的下一个成员。此外,这使用$ cond运算符将数组堆栈中的“see”值替换为false以帮助进行评估。

The final action with $cond is there to make sure that future iterations are not just adding the last value of the array over and over where the "slice" count is greater than the array members.

使用$ cond的最终操作是为了确保将来的迭代不只是在“slice”计数大于数组成员的地方反复添加数组的最后一个值。

That whole process needs to be repeated for as many items as you wish to "slice". Since we already found the "first" item in the initial grouping, that means n-1 iterations for the desired slice result.

整个过程需要针对您想要“切片”的多个项目重复进行。由于我们已经在初始分组中找到了“第一”项,这意味着所需切片结果的n-1次迭代。

The final steps are really just an optional illustration of converting everything back into arrays for the result as finally shown. So really just conditionally pushing items or false back by their matching position and finally "filtering" out all the false values so the end arrays have "six" and "five" members respectively.

最后的步骤实际上只是将所有内容转换回数组的可选示例,最终显示结果。因此,实际上只是通过匹配位置有条件地推送项目或错误返回,最后“过滤”所有错误值,以便结束数组分别具有“六”和“五”成员。

So there is not a standard operator to accommodate this, and you cannot just "limit" the push to 5 or 10 or whatever items in the array. But if you really have to do it, then this is your best approach.

所以没有一个标准的运算符来容纳这个,你不能只将“限制”限制为5或10或阵列中的任何项目。但如果你真的必须这样做,那么这是你最好的方法。


You could possibly approach this with mapReduce and forsake the aggregation framework all together. The approach I would take ( within reasonable limits ) would be to effectively have an in-memory hash-map on the server and accumulate arrays to that, while using JavaScript slice to "limit" the results:

您可以使用mapReduce来解决这个问题,并将聚合框架放在一起。我将采取的方法(在合理的限制范围内)将有效地在服务器上具有内存中的哈希映射并将数组累积到该数据库,同时使用JavaScript切片来“限制”结果:

db.messages.mapReduce(
    function () {

        if ( !stash.hasOwnProperty(this.conversation_ID) ) {
            stash[this.conversation_ID] = [];
        }

        if ( stash[this.conversation_ID.length < maxLen ) {
            stash[this.conversation_ID].push( this._id );
            emit( this.conversation_ID, 1 );
        }

    },
    function(key,values) {
        return 1;   // really just want to keep the keys
    },
    { 
        "scope": { "stash": {}, "maxLen": 10 },
        "finalize": function(key,value) {
            return { "msgs": stash[key] };                
        },
        "out": { "inline": 1 }
    }
)

So that just basically builds up the "in-memory" object matching the emitted "keys" with an array never exceeding the maximum size you want to fetch from your results. Additionally this does not even bother to "emit" the item when the maximum stack is met.

因此,基本上构建“内存中”对象,匹配发出的“键”,数组永远不会超过您想从结果中获取的最大大小。此外,当满足最大堆栈时,这甚至不会“发出”该项目。

The reduce part actually does nothing other than essentially just reduce to "key" and a single value. So just in case our reducer did not get called, as would be true if only 1 value existed for a key, the finalize function takes care of mapping the "stash" keys to the final output.

除了基本上只减少到“关键”和单个值之外,reduce部分实际上什么都不做。因此,万一我们的reducer没有被调用,如果一个键只存在1个值,那么finalize函数负责将“stash”键映射到最终输出。

The effectiveness of this varies on the size of the output, and JavaScript evaluation is certainly not fast, but possibly faster than processing large arrays in a pipeline.

其有效性因输出的大小而异,JavaScript评估肯定不会很快,但可能比在管道中处理大型数组更快。


Vote up the JIRA issues to actually have a "slice" operator or even a "limit" on "$push" and "$addToSet", which would both be handy. Personally hoping that at least some modification can be made to the $map operator to expose the "current index" value when processing. That would effectively allow "slicing" and other operations.

投入JIRA问题实际上有一个“切片”操作符,甚至是“$ push”和“$ addToSet”的“限制”,这两者都很方便。个人希望至少可以对$ map运算符进行一些修改,以便在处理时公开“当前索引”值。这将有效地允许“切片”和其他操作。

Really you would want to code this up to "generate" all of the required iterations. If the answer here gets enough love and/or other time pending that I have in tuits, then I might add some code to demonstrate how to do this. It is already a reasonably long response.

实际上,您可能希望对此进行编码以“生成”所有必需的迭代。如果这里的答案得到足够的爱和/或其他时间待定,那么我可能会添加一些代码来演示如何执行此操作。这已经是一个相当长的反应。


Code to generate pipeline:

生成管道的代码:

var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;

var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": []  } } };

for ( var x = 1; x <= maxLen; x++ ) {

    fproj["$project"][""+x] = 1;
    fproj["$project"]["pos"]["$const"].push( x );

    var rec = {
        "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
    };
    if ( stack.length == 0 ) {
        rec["$cond"].push( false );
    } else {
        lval = stack.pop();
        rec["$cond"].push( lval );
    }

    stack.push( rec );

    if ( x == 1) {
        pipe.push({ "$group": {
           "_id": key,
           "1": { "$first": val },
           "msgs": { "$push": val }
        }});
    } else {
        pipe.push({ "$unwind": "$msgs" });
        var proj = {
            "$project": {
                "msgs": 1
            }
        };

        proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };

        var grp = {
            "$group": {
                "_id": "$_id",
                "msgs": {
                    "$push": {
                        "$cond": [ { "$not": "$seen" }, "$msgs", false ]
                    }
                }
            }
        };

        for ( n=x; n >= 1; n-- ) {
            if ( n != x ) 
                proj["$project"][""+n] = 1;
            grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
        }

        pipe.push( proj );
        pipe.push({ "$sort": { "seen": 1 } });
        pipe.push(grp);
    }
}

pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": stack[0] }
    }
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }
}); 

That builds the basic iterative approach up to maxLen with the steps from $unwind to $group. Also embedded in there are details of the final projections required and the "nested" conditional statement. The last is basically the approach taken on this question:

这将构建基本的迭代方法,最多可达maxLen,步骤从$ unwind到$ group。还嵌入了所需的最终投影和“嵌套”条件语句的详细信息。最后一个基本上是针对这个问题的方法:

Does MongoDB's $in clause guarantee order?

MongoDB的$ in子句是否保证订单?

#2


0  

The $slice operator is not an aggregation operator so you can't do this (like I suggested in this answer, before the edit):

$ slice运算符不是聚合运算符,因此您无法执行此操作(就像我在此答案中建议的那样,在编辑之前):

db.messages.aggregate([
   { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
   { $project : { _id : 1, msgs : { $slice : 10 }}}]);

Neil's answer is very detailed, but you can use a slightly different approach (if it fits your use case). You can aggregate your results and output them to a new collection:

Neil的答案非常详细,但您可以使用稍微不同的方法(如果它适合您的使用案例)。您可以汇总结果并将其输出到新集合:

db.messages.aggregate([
   { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
   { $out : "msgs_agg" }
]);

The $out operator will write the results of the aggregation to a new collection. You can then use a regular find query project your results with the $slice operator:

$ out运算符会将聚合的结果写入新集合。然后,您可以使用$ slice运算符使用常规查找查询项目结果:

db.msgs_agg.find({}, { msgs : { $slice : 10 }});

For this test documents:

对于此测试文档:

> db.messages.find().pretty();
{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 1234 }
{ "_id" : 8, "conversation_ID" : 1234 }
{ "_id" : 9, "conversation_ID" : 1234 }

The result will be:

结果将是:

> db.msgs_agg.find({}, { msgs : { $slice : 10 }});
{ "_id" : 1234, "msgs" : [ { "msgid" : 7 }, { "msgid" : 8 }, { "msgid" : 9 } ] }
{ "_id" : 123, "msgs" : [ { "msgid" : 1 }, { "msgid" : 2 }, { "msgid" : 3 }, 
                          { "msgid" : 4 }, { "msgid" : 5 } ] }

Edit

编辑

I assume this would mean duplicating the whole messages collection. Isn't that overkill?

我认为这意味着重复整个消息集合。这不是矫枉过正吗?

Well, obviously this approach won't scale with huge collections. But, since you're considering using large aggregation pipelines or large map-reduce jobs you probably won't use this for "real-time" requests.

嗯,显然这种方法不会随着大量收藏而扩展。但是,由于您正在考虑使用大型聚合管道或大型map-reduce作业,因此您可能不会将其用于“实时”请求。

There are many cons of this approach: 16 MB BSON limit if you're creating huge documents with aggregation, wasting disk space / memory with duplication, increased disk IO...

这种方法有很多缺点:如果您使用聚合创建大型文档,浪费磁盘空间/内存并复制,增加磁盘IO,则需要16 MB BSON限制...

The pros of this approach: its simple to implement and thus easy to change. If your collection is rarely updated you can use this "out" collection like a cache. This way you wouldn't have to perform the aggregation operation multiple times and you could then even support "real-time" client requests on the "out" collection. To refresh your data, you can periodically do aggregation (e.g. in a background job that runs nightly).

这种方法的优点:它易于实现,因此易于更改。如果你的集合很少更新,你可以像缓存一样使用这个“out”集合。这样您就不必多次执行聚合操作,甚至可以在“out”集合上支持“实时”客户端请求。要刷新数据,您可以定期进行聚合(例如,在每晚运行的后台作业中)。

Like it was said in the comments this isn't an easy problem and there isn't a perfect solution for this (yet!). I showed you another approach you can use, it's up to you to benchmark and decide what's most appropriate for your use case.

就像在评论中所说的那样,这不是一个简单的问题,并且没有一个完美的解决方案(还有!)。我向您展示了另一种可以使用的方法,由您决定基准并决定哪种方法最适合您的用例。