最近在使用 Fabric8 Kubernetes Client
的过程中发现了新大陆一样,感觉利用这个库可以进行很多有趣的功能尝试,其中一个便是日志的本地化。
原因无他,rancher 页面性能实在太差了,经常性的暂停工作,碰到故障排查的时候,着实让人恼火。当我看到 Fabric8 Kubernetes Client
的日志相关 API 的时候我就立刻冒出来写一个日志小工具的想法。
API简介
首先我们简单介绍一下 API,以方便快速进入场景。后续等我自觉学得差不多了,再来列个专题给大家分享 Fabric8 Kubernetes Client
的全部 API 实践经验。
在本次分享当中,主要用到了两种日志 API:getLog()
、 watchLog()
。
以下是 Fabric8 Kubernetes Client 日志功能的结构化总结:
功能点与 API 对照表
功能点 | API 方法 | 适用场景 |
---|---|---|
获取 Pod 日志 | getLog() |
一次性获取短时任务或静态日志 |
获取特定容器日志 | inContainer("name").getLog() |
多容器 Pod 中指定容器的日志 |
实时日志流 | watchLog(outputStream) |
持续监控运行中的服务日志 |
按行获取最新日志 | tailingLines(n).getLog() |
仅需关注最近 N 行日志的场景 |
时间范围筛选 | sinceSeconds(n).getLog() |
获取最近 N 秒内的日志 |
带时间戳日志 | withTimestamps().getLog() |
需要精确时间信息的调试场景 |
获取历史日志(Terminated容器) | previousLog() |
排查已终止或重启容器的日志 |
批量获取 Pod 日志 | withLabel("key=value").list() |
根据标签筛选多个副本的日志 |
日志持久化到文件 | watchLog(new FileOutputStream(...)) |
长期存档或离线分析日志 |
首选日志流
对于日志需求来讲,流式调用自然是最好不过了,可以及时获取最新的日志信息,还不用后期干预。这里我选择了watchLog()(无参调用),watchLog()
返回一个 LogWatch 实例,该实例包含 getOutput()
方法,可获取日志流。适用于 手动解析日志流,比 watchLog(System.out)
更灵活。
import com.auto.fault.framework.funtester.frame.SourceCode
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.LogWatch
@Log4j2
class TesClient extends SourceCode {
public static void main(String[] args) {
try (KubernetesClient client = new DefaultKubernetesClient();
def pods = client.pods().inNamespace("test").list()
def marketPod = pods.getItems().find {
it.getMetadata().getName().contains("FunTester-pod")
}.getMetadata().getName();
LogWatch logWatch = client.pods()
.inNamespace("FunTester-default")
.withName("FunTester-mypod")
.watchLog()) {
logWatch.getOutput().eachLine {
System.out.println("Pod Log: " + it);
}
} catch (Exception e) {
e.printStackTrace();
}
waitForKey("Press any key to exit")
}
}
但是在实际的使用当中,经过几分钟,最长不超过十几分钟之后,流里面居然不再输出日志信创了,感觉很奇怪。经过查询资料和多方面验证,依旧没有解决问题,rancher 自带的WebSocket 推送日志也遇到这个问题。最终还是把问题甩给了服务端。
目前这种方式只适用于调试过程中查看日志,使用的时候本地启动一个脚本用来实时展示日志的情况。
下面是我的封装方法,仅供参考:
/
* 处理日志流, 通过 WatchLog 方式, 适用于实时日志, 适用于日志量较小的场景,可能会被中断
* @param client
* @param namespace
* @param podName
* @param consumer
* @return
*/
static def handlePodLogFlow(String namespace, String podName, Consumer<String> consumer) {
try (def logWatch = K8sService.client.pods()
.inNamespace(namespace)
.withName(podName)
.tailingLines(200)
.watchLog()) {
logWatch.getOutput().eachLine {
consumer(it)
}
} catch (e) {
log.error("handle log error: {}", e)
}
}
循环拉取
剩下的另外一个方式就是定时任务实现循环拉取日志了,用到了 sinceSeconds()
这个 API,逻辑也比较简单就是每隔一段时间拉取最近一段时间的日志。
但是在使用当中遇到一个问题,由于执行耗时以及网络原因,如果我每 10s 拉取最近 10s 的日志总会丢日志,如果拉取最近 11s 的日志,又会有一些重复的日志。
为了解决这个问题,我特意咨询了AI,给了下面三种思路。
- 基于日志 ID 去重:如果日志自带唯一标识(如 requestId、traceId),可以使用 集合(Set) 记录已处理的日志 ID,在处理新日志时先检查 ID 是否已存在,若已存在则跳过。这种方式精准高效,但 Set 可能会无限增长,需要定期清理,适用于日志 ID 唯一且可以长期存储 ID 记录的场景。
- 基于时间窗口去重:如果日志没有唯一 ID,但有时间戳,可以记录 上一次处理的最大时间戳,只处理时间戳 大于 这个值的日志,处理完成后更新最大时间戳。这种方式内存占用小,适用于 时间戳严格递增 的情况,但如果日志乱序,可能会丢失部分数据。
- 基于 LRU(最近最少使用)缓存去重:如果日志 ID 变化范围大,但不能无限存储已处理 ID,可以使用 固定大小的缓存(如 LRU 哈希表)存储最近 N 条已处理的日志 ID,超出部分自动删除。这种方式适用于 高并发、大量日志流入 的情况,能有效控制内存占用,但需要合理设置缓存大小,以平衡去重效果与资源消耗。
最终我选了基于时间窗户,日志返回的时间是毫秒时间戳,这样根据时间戳进行筛选,可以避免重复和丢日志的情况。
下面是我的封装代码:
/**
* 处理日志流, 通过 getlogs 方式, 定时获取任务,避免中断
* @param namespace
* @param podName
* @param consumer
* @return
*/
static def handleLogs(String namespace, String podName, Consumer<String> consumer) {
long lastTime
ThreadPoolUtil.scheduleRate({
time {
try (def reader = K8sService.client.pods()
.inNamespace(namespace)
.withName(podName)
.sinceSeconds(11)
.getLogReader()) {
def lines = reader.readLines()
boolean start = false
lines.each {
if (!start && getTimestamp(it) > lastTime) {
start = true
}
if (start) {
consumer(it)
}
}
lastTime = getTimestamp(lines.get(lines.size() - 1));
} catch (Exception e) {
log.error("handle log error: {}", e)
}
}
}, 10)
}
使用方法如下:
import com.auto.fault.framework.funtester.frame.SourceCode
import com.auto.fault.framework.utils.k8s.K8sLog
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import java.util.concurrent.ScheduledFuture
@Log4j2
class TesClient extends SourceCode {
public static void main(String[] args) {
KubernetesClient client = new DefaultKubernetesClient()
ScheduledFuture<?> logs = K8sLog.handleLogs("funtester", "funtester-0", {
println it
})
waitForKey("Press any key to exit")
logs.cancel(true)
client.close()
}
}
原来我也想通过一个去重的队列实现,发现 Java 本身并没有提供这个能力,如果单独写一个比较麻烦,得不偿失,最终也放弃了。
基于 LinkedHashMap
这里分享一下 LinkedHashMap 方案,因为 removeEldestEntry()
方法让我学到了新知识,本来我打算用 Caffeine
实现的,没想到 Java 还提供了替代方案。
基于插入顺序
import java.util.*;
public class LogProcessor {
private static final int MAX_ENTRIES = 10000; // 只存最近的日志 ID
private static final LinkedHashMap<String, Long> processedLogs =
new LinkedHashMap<>(MAX_ENTRIES, 0.75f) {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
return size() > MAX_ENTRIES;
}
};
public void processLogs(List<Log> logs) {
long now = System.currentTimeMillis();
for (Log log : logs) {
if (processedLogs.containsKey(log.getId())) {
continue;
}
processedLogs.put(log.getId(), now);
processLog(log);
}
}
private void processLog(Log log) {
System.out.println("FunTester Processing log: " + log);
}
}
removeEldestEntry
是 LinkedHashMap 提供的一个受保护(protected)方法,用于控制缓存的大小。当 LinkedHashMap 作为 LRU 缓存(最近最少使用缓存) 使用时,可以重写该方法,在元素数量超过限制时自动移除最早插入的元素。
- eldest 参数表示当前 LinkedHashMap 中 最老(最早插入)的键值对。
- 返回值 true 时,eldest 会被移除;返回 false,则不会移除。
基于最近访问顺序
要根据访问时间删除 LinkedHashMap 中的旧数据,可以利用 LinkedHashMap 的 accessOrder=true 特性,让最近访问的数据排在后面,并在 removeEldestEntry 方法中检查数据的时间戳是否超时,超时则删除。适用于 基于时间的自动清理缓存,如日志、会话管理等。
实现步骤
- 启用 LRU 访问顺序:创建 LinkedHashMap 时,设置 accessOrder=true,使最近访问的数据自动移动到队尾。
- 存储时间戳:在 LinkedHashMap 中存储键值对,值包含数据的时间戳。
- 按访问时间删除:在 removeEldestEntry 方法中,检查最老的元素是否超过设定的超时时间(如 10 分钟),如果超时,则返回 true 进行删除。
实现方法:
import java.util.*;
public class AccessTimeCache<K, V> extends LinkedHashMap<K, V> {
private final long EXPIRATION_TIME_MS; // 过期时间,单位毫秒
public AccessTimeCache(int capacity, long expirationTimeMs) {
super(capacity, 0.75f, true); // accessOrder=true,启用 LRU
this.EXPIRATION_TIME_MS = expirationTimeMs;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
long currentTime = System.currentTimeMillis();
// 假设 V 是一个包含时间戳的对象,这里需替换成实际数据结构
if (eldest.getValue() instanceof CacheItem) {
CacheItem item = (CacheItem) eldest.getValue();
return (currentTime - item.timestamp) > EXPIRATION_TIME_MS;
}
return false;
}
// 模拟存储数据时的结构
static class CacheItem {
String data;
long timestamp;
public CacheItem(String data) {
this.data = data;
this.timestamp = System.currentTimeMillis();
}
}
}
当然我们还可以根据时间+条目总数来控制,这里就不再赘述了。