如何在python apache beam中的窗口中订购元素?

时间:2022-02-04 15:36:24

I noticed that java apache beam has class groupby.sortbytimestamp does python have that feature implemented yet? If not what would be the way to sort elements in a window? I figure I could sort the entire window in a DoFn, but I would like to know if there is a better way.

我注意到java apache beam有类groupby.sortbytimestamp python是否已实现该功能?如果不是在窗口中对元素进行排序的方法是什么?我想我可以在DoFn中对整个窗口进行排序,但我想知道是否有更好的方法。

2 个解决方案

#1


6  

There is not currently built-in value sorting in Beam (in either Python or Java). Right now, the best option is to sort the values yourself in a DoFn like you mentioned.

Beam目前没有内置的值排序(Python或Java)。现在,最好的选择是在你提到的DoFn中自己对值进行排序。

#2


1  

Here's a solution using a CombineFn. It has the added bonus of deduplicating data using the TreeSet. You also should make sure your data for a window is small enough to fit in memory on a single worker.

这是使用CombineFn的解决方案。它还具有使用TreeSet对数据进行重复数据删除的额外好处。您还应该确保窗口的数据足够小,以适应单个工作程序的内存。

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
    return new TreeSet<>(Comparator
            .comparingLong(MarketData::getEventTime)
            .thenComparing(MarketData::getOrderbookType));
}

@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
    accum.add(input);
    return accum;
}

@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {

    TreeSet<MarketData> merged = createAccumulator();
    for (TreeSet<MarketData> accum : accums) {
        merged.addAll(accum);
    }
    return merged;
}

@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
    return Lists.newArrayList(accum.iterator());
}

}

}

#1


6  

There is not currently built-in value sorting in Beam (in either Python or Java). Right now, the best option is to sort the values yourself in a DoFn like you mentioned.

Beam目前没有内置的值排序(Python或Java)。现在,最好的选择是在你提到的DoFn中自己对值进行排序。

#2


1  

Here's a solution using a CombineFn. It has the added bonus of deduplicating data using the TreeSet. You also should make sure your data for a window is small enough to fit in memory on a single worker.

这是使用CombineFn的解决方案。它还具有使用TreeSet对数据进行重复数据删除的额外好处。您还应该确保窗口的数据足够小,以适应单个工作程序的内存。

public static class DedupAndSortByTime extends Combine.CombineFn<MarketData, TreeSet<MarketData>, List<MarketData>> {
@Override
public TreeSet<MarketData> createAccumulator() {
    return new TreeSet<>(Comparator
            .comparingLong(MarketData::getEventTime)
            .thenComparing(MarketData::getOrderbookType));
}

@Override
public TreeSet<MarketData> addInput(TreeSet<MarketData> accum, MarketData input) {
    accum.add(input);
    return accum;
}

@Override
public TreeSet<MarketData> mergeAccumulators(Iterable<TreeSet<MarketData>> accums) {

    TreeSet<MarketData> merged = createAccumulator();
    for (TreeSet<MarketData> accum : accums) {
        merged.addAll(accum);
    }
    return merged;
}

@Override
public List<MarketData> extractOutput(TreeSet<MarketData> accum) {
    return Lists.newArrayList(accum.iterator());
}

}

}