flume 1.7.0-taildirSource 支持 windows系统

时间:2021-12-22 05:57:18

Flume-ng 1.7.0 中增加了TaildirSource,可以监控目录中文件的变化自动读取文件内容。
不过实际应用时发现几个问题:

1,不支持windows系统。
2,windows下会影响 log4j 日志文件的切分,会使log4j日志不切分一直增大,flume停了才会 切分日志。

不支持 windows 系统的问题是因为 taildirSource 的源码ReliableTaildirEventReader类的getInode(File file) 方法中的代码依赖linux系统。

 private long getInode(File file) throws IOException {
long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
return inode;
}

window系统下的JDK不支持:Files.getAttribute(file.toPath(), “unix:ino”)。不支持咋办?当然是想办法实现一个啊。Linux系统下可以使用inode跟踪一个文件,文件被重命名inode不会改变。Windows的NTFS系统有类似的东东,具体实现方式如下:

1.添加依赖

          <dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.2.2</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>4.2.2</version>
</dependency>

2.新建Kernel32 类


import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Structure;
import com.sun.jna.platform.win32.WinBase.FILETIME;
import com.sun.jna.platform.win32.WinDef.DWORD;
import com.sun.jna.platform.win32.WinNT.HANDLE;
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIFunctionMapper;
import com.sun.jna.win32.W32APITypeMapper;

/**
* Created by yangyibo on 17/3/14.
*/

public interface Kernel32 extends StdCallLibrary {
final static Map WIN32API_OPTIONS = new HashMap() {
private static final long serialVersionUID = 1L;

{
put(Library.OPTION_FUNCTION_MAPPER, W32APIFunctionMapper.UNICODE);
put(Library.OPTION_TYPE_MAPPER, W32APITypeMapper.UNICODE);
}
};

Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("Kernel32",
Kernel32.class, WIN32API_OPTIONS);

int GetLastError();

class BY_HANDLE_FILE_INFORMATION extends Structure {
public DWORD dwFileAttributes;
public FILETIME ftCreationTime;
public FILETIME ftLastAccessTime;
public FILETIME ftLastWriteTime;
public DWORD dwVolumeSerialNumber;
public DWORD nFileSizeHigh;
public DWORD nFileSizeLow;
public DWORD nNumberOfLinks;
public DWORD nFileIndexHigh;
public DWORD nFileIndexLow;

public static class ByReference extends BY_HANDLE_FILE_INFORMATION
implements Structure.ByReference {


}

;

public static class ByValue extends BY_HANDLE_FILE_INFORMATION
implements Structure.ByValue {


}

@Override
protected List getFieldOrder() {
List fields = new ArrayList();
fields.addAll(Arrays.asList(new String[]{"dwFileAttributes",
"ftCreationTime", "ftLastAccessTime", "ftLastWriteTime",
"dwVolumeSerialNumber", "nFileSizeHigh", "nFileSizeLow",
"nNumberOfLinks", "nFileIndexHigh", "nFileIndexLow"}));
return fields;

}

;
}

;

boolean GetFileInformationByHandle(HANDLE hFile,
BY_HANDLE_FILE_INFORMATION lpFileInformation);
}

3.新建WinFileUtil 类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinBase;
import com.sun.jna.platform.win32.WinNT.HANDLE;

import java.io.File;
import java.nio.file.Files;

/**
* Created by yangyibo on 17/3/14.
*/

public class WinFileUtil {

public static WinFileUtil getWinFile(){
return new WinFileUtil();
}
private static Logger logger = LoggerFactory.getLogger(WinFileUtil.class);

public static String getFileId(String filepath) {

final int FILE_SHARE_READ = (0x00000001);
final int OPEN_EXISTING = (3);
final int GENERIC_READ = (0x80000000);
final int FILE_ATTRIBUTE_ARCHIVE = (0x20);

WinBase.SECURITY_ATTRIBUTES attr = null;
com.wh.example.Kernel32.BY_HANDLE_FILE_INFORMATION lpFileInformation = new com.wh.example.Kernel32.BY_HANDLE_FILE_INFORMATION();
HANDLE hFile = null;

hFile = Kernel32.INSTANCE.CreateFile(filepath, 0,
FILE_SHARE_READ, attr, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE,
null);
String ret = "0";
if (Kernel32.INSTANCE.GetLastError() == 0) {

com.wh.example.Kernel32.INSTANCE
.GetFileInformationByHandle(hFile, lpFileInformation);

ret = lpFileInformation.dwVolumeSerialNumber.toString()
+ lpFileInformation.nFileIndexLow.toString();

Kernel32.INSTANCE.CloseHandle(hFile);

if (Kernel32.INSTANCE.GetLastError() == 0) {
logger.debug("inode:" + ret);
return ret;
} else {
logger.error("关闭文件发生错误:{}", filepath);
throw new RuntimeException("关闭文件发生错误:" + filepath);
}
} else {
if (hFile != null) {
Kernel32.INSTANCE.CloseHandle(hFile);
}
logger.error("打开文件发生错误:{}", filepath);
throw new RuntimeException("打开文件发生错误:" + filepath);
}

}

public static void main(String[] args) throws Exception {
File f=new File("/Users/yangyibo/Idea/fileInfoTest/target/logs/fileInfo.log");
System.out.println(f.toPath());
System.out.println(Long.valueOf("1232188718728378"));
System.out.println("file ino: "+Files.getAttribute(f.toPath(), "unix:ino"));
}
}

4.修改源码

    public static final String OS_NAME = System.getProperty("os.name").toLowerCase();

private long getInode(File file) throws IOException {
long inode;
if (OS_NAME.contains("windows")) {
inode = Long.parseLong(WinFileUtil.getFileId(file.toPath().toString()));
} else {
inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
}
return inode;
}

这样就解决了windows 的第一个问题,关于第二个问题没有很好的解决方案。如果有好的解决方法,私聊我哦。

参考文章:http://www.xue163.com/2280/1/22800498.html