FTP/FTPS 适配器

时间:2022-12-09 10:59:29

FTP/FTPS 适配器

Spring 集成为 FTP 和 FTP 的文件传输操作提供支持。

文件传输协议 (FTP) 是一种简单的网络协议,可让您在 Internet 上的两台计算机之间传输文件。 FTPS 代表 “FTP over SSL”。

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>6.0.0</version>
</dependency>

在FTP通信方面,有两个参与者:客户端和服务器。 若要使用 FTP 或 FTPS 传输文件,请使用启动与运行 FTP 服务器的远程计算机的连接的客户端。 建立连接后,客户端可以选择发送或接收文件副本。

Spring 集成通过提供三个客户端端点来支持通过 FTP 或 FTPS 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。 它还提供了方便的基于命名空间的配置选项,用于定义这些客户端组件。

若要使用 FTP 命名空间,请将以下内容添加到 XML 文件的标头中:

xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"

FTP 会话工厂

Spring 集成提供了可用于创建 FTP(或 FTPS)会话的工厂。

默认工厂

从版本 3.0 开始,默认情况下不再缓存会话。 请参阅 FTP 会话缓存。

在配置 FTP 适配器之前,必须配置 FTP 会话工厂。 您可以使用常规 Bean 定义配置 FTP 会话工厂,其中实现类为 。 以下示例显示了一个基本配置:​​o.s.i.ftp.session.DefaultFtpSessionFactory​

<bean 
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="kermit"/>
<property name="password" value="frog"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>

对于 FTPS 连接,您可以改用。​​o.s.i.ftp.session.DefaultFtpsSessionFactory​

以下示例显示了一个完整的配置:

<bean 
class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="oleg"/>
<property name="password" value="password"/>
<property name="clientMode" value="1"/>
<property name="fileType" value="2"/>
<property name="useClientMode" value="true"/>
<property name="cipherSuites" value="a,b.c"/>
<property name="keyManager" ref="keyManager"/>
<property name="protocol" value="SSL"/>
<property name="trustManager" ref="trustManager"/>
<property name="prot" value="P"/>
<property name="needClientAuth" value="true"/>
<property name="authValue" value="oleg"/>
<property name="sessionCreation" value="true"/>
<property name="protocols" value="SSL, TLS"/>
<property name="implicit" value="true"/>
</bean>

如果您遇到连接问题,并且想要跟踪会话创建以及查看轮询了哪些会话,则可以通过将记录器设置为级别来启用会话跟踪(例如,)。​​TRACE​​​​log4j.category.org.springframework.integration.file=TRACE​

现在,您只需将这些会话工厂注入适配器。 适配器使用的协议(FTP 或 FTPS)取决于已注入适配器的会话工厂的类型。

为 FTP 或 FTPS 会话工厂提供值的一种更实用的方法是使用 Spring 的属性占位符支持(参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer)。

高级配置

​DefaultFtpSessionFactory​​提供对底层客户端API的抽象,该API(从Spring Integration 2.0开始)是Apache Commons Net。 这样就可以避免 的低级配置详细信息。 会话工厂上公开了几个常见属性(从版本 4.0 开始,现在包括 、 和 )。 但是,有时需要访问较低级别的配置才能实现更高级的配置(例如设置活动模式的端口范围)。 为此,(所有 FTP 会话工厂的基类)以以下清单中所示的两种后处理方法的形式公开钩子:​​org.apache.commons.net.ftp.FTPClient​​​​connectTimeout​​​​defaultTimeout​​​​dataTimeout​​​​FTPClient​​​​AbstractFtpSessionFactory​

/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}

如您所见,这两种方法没有默认实现。 但是,通过扩展 ,您可以覆盖这些方法以提供更高级的配置,如以下示例所示:​​DefaultFtpSessionFactory​​​​FTPClient​

public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {

protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}

FTPS 和共享 SSL 讨论

当通过 SSL 或 TLS 使用 FTP 时,某些服务器要求在控制和数据连接上使用相同的 FTP。 这是为了防止“窃取”数据连接。 有关详细信息,请参阅 https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html。SSLSession

目前,Apache FTPSClient 不支持此功能。 见NET-408。

以下解决方案由 Stack Overflow 提供,在 上使用反射,因此它可能不适用于其他 JVM。 堆栈溢出答案是在 2015 年提交的,Spring 集成团队已经在 JDK 1.8.0_112 上测试了该解决方案。​​sun.security.ssl.SSLSessionContextImpl​

以下示例演示如何创建 FTPS 会话:

@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {

@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}

};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}

private static final class SharedSSLFTPSClient extends FTPSClient {

@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}

}

}

委派会话工厂

版本 4.2 引入了 ,它允许在运行时选择实际的会话工厂。 在调用 FTP 终结点之前,请调用工厂以将密钥与当前线程关联。 然后使用该键查找要使用的实际会话工厂。 您可以通过在使用后调用来清除密钥。​​DelegatingSessionFactory​​​​setThreadKey()​​​​clearThreadKey()​

我们添加了方便的方法,以便您可以轻松地从消息流使用委派会话工厂。

以下示例演示如何声明委派会话工厂:

<bean  class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />

使用会话缓存(请参阅 FTP 会话缓存​)时,应缓存每个委托。 您无法缓存本身。​​DelegatingSessionFactory​

从版本 5.0.7 开始,可以与 a 结合使用以轮询多个服务器;请参阅入站通道适配器:轮询多个服务器和目录。​​DelegatingSessionFactory​​​​RotatingServerAdvice​

FTP 入站通道适配器

FTP 入站通道适配器是一个特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建的新文件),此时它将启动文件传输。 以下示例显示如何配置:​​inbound-channel-adapter​

<int-ftp:inbound-channel-adapter 
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如前面的配置所示,您可以使用该元素配置 FTP 入站通道适配器,同时还为各种属性提供值,例如 、(基于简单模式匹配,而不是正则表达式)和对 .​​inbound-channel-adapter​​​​local-directory​​​​filename-pattern​​​​session-factory​

默认情况下,传输的文件与原始文件的名称相同。 如果要覆盖此行为,可以设置该属性,该属性允许您提供 SpEL 表达式来生成本地文件的名称。 与出站网关和适配器不同,其中 SpEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终使用传输的文件作为其有效负载生成的消息。 因此,SpEL 评估上下文的根对象是远程文件的原始名称 (a)。​​local-filename-generator-expression​​​​Message​​​​String​

入站通道适配器首先检索本地目录的对象,然后根据轮询器配置发出每个文件。 从版本 5.0 开始,您现在可以在需要新文件检索时限制从 FTP 服务器获取的文件数。 当目标文件非常大或在具有持久文件列表筛选器的群集系统中运行时,这可能很有用,稍后将讨论。 用于此目的。 负值(默认值)表示没有限制,将检索所有匹配的文件。 有关详细信息,请参阅入站通道适配器:控制远程文件提取。 从版本 5.0 开始,您还可以通过设置属性来提供自定义实现。​​File​​​​max-fetch-size​​​​DirectoryScanner​​​​inbound-channel-adapter​​​​scanner​

从 Spring Integration 3.0 开始,您可以指定属性(其默认值为 )。 当 时,本地文件的修改时间戳设置为从服务器检索的值。 否则,它将设置为当前时间。​​preserve-timestamp​​​​false​​​​true​

从版本 4.2 开始,您可以指定 而不是 ,以便动态确定每次轮询的目录,例如 。​​remote-directory-expression​​​​remote-directory​​​​remote-directory-expression="@myBean.determineRemoteDir()"​

从版本 4.3 开始,可以省略 和 属性。 它们默认为 . 在这种情况下,根据FTP协议,客户端工作目录用作默认的远程目录。​​remote-directory​​​​remote-directory-expression​​​​null​

有时,基于使用该属性指定的简单模式进行文件筛选可能还不够。 如果是这种情况,可以使用该属性指定正则表达式(如 )。 此外,如果需要完全控制,则可以使用该属性并提供对 的任何自定义实现的引用,这是一个用于筛选文件列表的策略接口。 此筛选器确定检索哪些远程文件。 您还可以使用.​​filename-pattern​​​​filename-regex​​​​filename-regex=".*\.test$"​​​​filter​​​​o.s.i.file.filters.FileListFilter​​​​AcceptOnceFileListFilter​​​​CompositeFileListFilter​

将其状态存储在内存中。 如果希望状态在系统重新启动后继续存在,请考虑改用 。 此筛选器将接受的文件名存储在策略的实例中(请参阅元数据存储)。 此过滤器与文件名和远程修改时间匹配。​​AcceptOnceFileListFilter​​​​FtpPersistentAcceptOnceFileListFilter​​​​MetadataStore​

从版本 4.0 开始,此筛选器需要 . 与共享数据存储(例如与 一起使用)一起使用时,它允许在多个应用程序或服务器实例之间共享筛选器键。​​ConcurrentMetadataStore​​​​Redis​​​​RedisMetadataStore​

从版本 5.0 开始,默认情况下对 . 此过滤器也与 XML 配置中的 or 选项以及 Java DSL 中的 or 选项一起应用。 任何其他用例都可以使用 (或 ) 进行管理。​​FtpPersistentAcceptOnceFileListFilter​​​​SimpleMetadataStore​​​​FtpInboundFileSynchronizer​​​​regex​​​​pattern​​​​FtpInboundChannelAdapterSpec​​​​CompositeFileListFilter​​​​ChainFileListFilter​

前面的讨论涉及在检索文件之前筛选文件。 检索文件后,将对文件系统上的文件应用其他筛选器。 默认情况下,如前所述,这会在内存中保留状态,并且不考虑文件的修改时间。 除非应用程序在处理后删除文件,否则适配器将在应用程序重新启动后默认重新处理磁盘上的文件。​​AcceptOnceFileListFilter​

此外,如果将 配置为使用 并且远程文件时间戳更改(导致重新获取),则默认本地筛选器不允许处理此新文件。​​filter​​​​FtpPersistentAcceptOnceFileListFilter​

有关此筛选器及其使用方式的详细信息,请参阅远程持久文件列表筛选器。

您可以使用该属性来配置本地文件系统筛选器的行为。 从版本 4.3.8 开始,默认情况下配置 a。 此筛选器将接受的文件名和修改的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。 默认值为 ,它将状态存储在内存中。​​local-filter​​​​FileSystemPersistentAcceptOnceFileListFilter​​​​MetadataStore​​​​MetadataStore​​​​SimpleMetadataStore​

从版本 4.1.5 开始,这些筛选器具有一个新属性 (),导致它们刷新 每次更新时的元数据存储(如果存储实现)。​​flushOnUpdate​​​​Flushable​

此外,如果使用分布式(如 Redis​),则可以具有同一适配器或应用程序的多个实例,并确保每个文件仅处理一次。​​MetadataStore​

实际的本地过滤器是包含提供的过滤器和模式过滤器,用于阻止处理正在下载的文件(基于 )。 使用此后缀下载文件(默认值为 ),并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。​​CompositeFileListFilter​​​​temporary-file-suffix​​​​.writing​

该属性允许您配置文件分隔符,以便在默认“/”不适用于您的特定环境时使用。​​remote-file-separator​

有关这些属性的更多详细信息,请参阅架构。

您还应该了解 FTP 入站通道适配器是轮询使用者。 因此,必须配置轮询器(通过使用全局默认值或本地子元素)。 传输文件后,将生成一条以 a 作为其有效负载的消息,并将其发送到由该属性标识的通道。​​java.io.File​​​​channel​

详细了解文件过滤和不完整文件

有时,刚刚出现在受监视(远程)目录中的文件不完整。 通常,此类文件使用临时扩展名(如)写入,然后在写入过程完成后重命名。 在大多数情况下,您只对完整的文件感兴趣,并且只想筛选完整的文件。 若要处理这些方案,可以使用 、 和 属性提供的筛选支持。 以下示例使用自定义筛选器实现:​​somefile.txt.writing​​​​filename-pattern​​​​filename-regex​​​​filter​

<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean class="org.example.CustomFilter"/>

入站 FTP 适配器的轮询器配置说明

入站 FTP 适配器的作业由两个任务组成:

  1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。
  2. 对于每个传输的文件,生成一条消息,将该文件作为有效负载,并将其发送到由“channel”属性标识的通道。 这就是为什么它们被称为“通道适配器”而不仅仅是“适配器”。 这种适配器的主要工作是生成要发送到消息通道的消息。 从本质上讲,第二个任务优先,如果您的本地目录已经有一个或多个文件,它首先从这些文件生成消息。 仅当处理完所有本地文件后,它才会启动远程通信以检索更多文件。

此外,在轮询器上配置触发器时,应密切注意该属性。 其默认值适用于所有实例(包括 FTP)。 这意味着,一旦处理了一个文件,它就会等待由触发器配置确定的下一个执行时间。 如果您碰巧有一个或多个文件位于 中,它将在启动与远程 FTP 服务器的通信之前处理这些文件。 此外,如果设置为 (默认值),则它一次仅处理一个文件,时间间隔由触发器定义,本质上是“一个轮询 === 一个文件”。​​max-messages-per-poll​​​​1​​​​SourcePollingChannelAdapter​​​​local-directory​​​​max-messages-per-poll​​​​1​

对于典型的文件传输用例,您很可能希望采取相反的行为:处理每次轮询的所有文件,然后才等待下一次轮询。 如果是这种情况,请设置为 -1。 然后,在每次轮询时,适配器都会尝试生成尽可能多的消息。 换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输所有可用内容以在本地处理。 只有这样,轮询操作才被视为完成,轮询器等待下一个执行时间。​​max-messages-per-poll​

您也可以将“每次轮询的最大消息数”值设置为正值,该值指示每次轮询时要从文件创建的消息的向上限制。 例如,值 of 表示在每次轮询时,它尝试处理的文件不超过十个。​​10​

从故障中恢复

了解适配器的体系结构非常重要。 有一个文件同步器可以获取文件,还有一个为每个文件发出消息 同步文件。 如前所述,涉及两个筛选器。 属性(和模式)引用远程 (FTP) 文件列表,以避免获取已 被取回。 用于确定哪些文件将作为消息发送。​​FileReadingMessageSource​​​​filter​​​​local-filter​​​​FileReadingMessageSource​

同步器列出远程文件并查阅其筛选器。 然后传输文件。 如果在文件传输过程中发生 IO 错误,则会删除已添加到筛选器的任何文件,以便它们 有资格在下一次投票中重新获取。 这仅适用于过滤器实现(如 )。​​ReversibleFileListFilter​​​​AcceptOnceFileListFilter​

如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚筛选器,因此默认情况下不会重新处理失败的文件。

如果您希望在失败后重新处理此类文件,可以使用类似于以下内容的配置来方便 从筛选器中删除失败的文件:

<int-ftp:inbound-channel-adapter 
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>

<bean
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory >
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean
class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任何 .​​ResettableFileListFilter​

从 V5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名对应的子目录。 这也可以是远程子路径。 为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以根据算法为内部目录提供新的目录。 有关更多信息,请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()。 此外,您现在可以将 切换到 -based by using 选项。 它还配置为所有实例对本地目录中的任何修改做出反应。 前面显示的重新处理示例基于 从本地目录删除文件时要执行的内置功能 ()。 有关详细信息,请参阅 WatchServiceDirectoryScanner。​​FileReadingMessageSource​​​​RecursiveDirectoryScanner​​​​Files.walk()​​​​AbstractInboundFileSynchronizingMessageSource​​​​WatchService​​​​DirectoryScanner​​​​setUseWatchService()​​​​WatchEventType​​​​FileReadingMessageSource.WatchServiceDirectoryScanner​​​​ResettableFileListFilter.remove()​​​​StandardWatchEventKinds.ENTRY_DELETE​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}

@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}

@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}

};
}

}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}

处理不完整的数据

请参阅处理不完整的数据。

提供 用于过滤远程系统上没有相应标记文件的远程文件。 有关配置信息,请参阅 Javadoc(并浏览到父类)。FtpSystemMarkerFilePresentFileListFilter

FTP 流式处理入站通道适配器

版本 4.3 引入了流式入站通道适配器。 此适配器生成有效负载类型的消息,允许在不写入 本地文件系统。 由于会话保持打开状态,因此使用应用程序负责在文件打开时关闭会话 消耗。 会话在标头 () 中提供。 标准框架组件(如 和 )会自动关闭会话。 有关这些组件的详细信息,请参阅文件拆分器和流转换器。 以下示例显示如何配置:​​InputStream​​​​closeableResource​​​​IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE​​​​FileSplitter​​​​StreamTransformer​​​​inbound-streaming-channel-adapter​

<int-ftp:inbound-streaming-channel-adapter 
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

只允许 、、 或 中的一个。​​filename-pattern​​​​filename-regex​​​​filter​​​​filter-expression​

从版本 5.0 开始,默认情况下,适配器会根据内存中 . 默认情况下,此筛选器也与文件名模式(或正则表达式)一起应用。 如果需要允许重复项,可以使用 。 任何其他用例都可以由 (或 ) 处理。 Java 配置(在本文档后面)​展示了一种在处理后删除远程文件以避免重复的技术。​​FtpStreamingMessageSource​​​​FtpPersistentAcceptOnceFileListFilter​​​​SimpleMetadataStore​​​​AcceptAllFileListFilter​​​​CompositeFileListFilter​​​​ChainFileListFilter​

有关 及其使用方式的详细信息,请参阅远程持久文件列表筛选器。FtpPersistentAcceptOnceFileListFilter

当需要提取时,使用该属性可以限制每次轮询时提取的文件数。 将其设置为在群集环境中运行时使用持久性筛选器。 有关详细信息,请参阅入站通道适配器:控制远程文件提取。max-fetch-size1

适配器将远程目录和文件名分别放在 和 标头中。 从版本 5.0 开始,标头提供其他远程文件信息(默认情况下以 JSON 表示)。 如果在 上设置属性 to ,则标头包含一个对象。 可以使用该方法访问底层 Apache Net 库提供的对象。 使用 XML 配置时,该属性不可用,但可以通过将 注入到其中一个配置类中进行设置。 另请参阅远程文件信息。​​FileHeaders.REMOTE_DIRECTORY​​​​FileHeaders.REMOTE_FILE​​​​FileHeaders.REMOTE_FILE_INFO​​​​fileInfoJson​​​​FtpStreamingMessageSource​​​​false​​​​FtpFileInfo​​​​FTPFile​​​​FtpFileInfo.getFileInfo()​​​​fileInfoJson​​​​FtpStreamingMessageSource​

从版本 5.1 开始,泛型类型为 。 以前,它是. 这是因为排序现在在处理的早期执行,在过滤和应用 之前。​​comparator​​​​FTPFile​​​​AbstractFileInfo<FTPFile>​​​​maxFetch​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}

@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}

@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}

@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}

}

请注意,在此示例中,转换器下游的消息处理程序具有在处理后删除远程文件的消息处理程序。​​advice​

入站通道适配器:轮询多个服务器和目录

从版本 5.0.7 开始,可用;当配置为轮询器建议时,入站适配器可以轮询多个服务器和目录。 配置建议并将其正常添加到轮询器的建议链中。 A 用于选择服务器,有关详细信息,请参阅​​委派会话工厂​​。 建议配置由对象列表组成。​​RotatingServerAdvice​​​​DelegatingSessionFactory​​​​RotationPolicy.KeyDirectory​

@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此建议将轮询服务器上的目录,直到不存在新文件,然后移动到目录,然后移动到服务器上的目录,等等。​​foo​​​​one​​​​bar​​​​baz​​​​two​

可以使用构造函数 arg 修改此默认行为:​​fair​

公平

@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论上一个轮询是否返回文件,建议都将移动到下一个服务器/目录。

或者,您可以根据需要提供自己的消息源以重新配置消息源:​​RotationPolicy​

政策

public interface RotationPolicy {

void beforeReceive(MessageSource<?> source);

void afterReceive(boolean messageReceived, MessageSource<?> source);

}

习惯

@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}

属性(在同步器上)现在可以包含变量。 这允许将从不同目录检索的文件下载到本地的类似目录:​​local-filename-generator-expression​​​​localFilenameGeneratorExpression​​​​#remoteDirectory​

@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}

使用此建议时,请勿在轮询器上配置 a;有关详细信息,请参阅消息源的条件轮询器​。​​TaskExecutor​

入站通道适配器:控制远程文件提取

配置入站通道适配器时应考虑两个属性。,与所有轮询器一样,可用于限制每次轮询时发出的消息数(如果已准备好的数值超过配置的值)。 (从版本 5.0 开始)可以限制一次从远程服务器检索的文件数。​​max-messages-per-poll​​​​max-fetch-size​

以下方案假定起始状态为空本地目录:

  • ​max-messages-per-poll=2​​和 :适配器提取一个文件,发出它,提取下一个文件,发出它,然后休眠,直到下一次轮询。max-fetch-size=1
  • ​max-messages-per-poll=2​​和 ):适配器获取这两个文件,然后发出每个文件。max-fetch-size=2
  • ​max-messages-per-poll=2​​和 :适配器最多获取四个文件(如果可用)并发出前两个文件(如果至少有两个)。 接下来的两个文件在下一次轮询时发出。max-fetch-size=4
  • ​max-messages-per-poll=2​​未指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。 后续文件在后续轮询中发出(一次两个)。 当所有文件都被使用时,将再次尝试远程提取,以选取任何新文件。max-fetch-size

部署应用程序的多个实例时,我们建议使用小型 ,以避免一个实例“抓取”所有文件并使其他实例陷入饥饿。​​max-fetch-size​

的另一个用途是,如果要停止提取远程文件,但继续处理已提取的文件。 在 上设置属性(以编程方式、使用 JMX 或控制总线)有效地阻止适配器获取更多文件,但允许轮询器继续发出以前已提取的文件的消息。 如果轮询器在更改属性时处于活动状态,则更改将在下一次轮询时生效。​​max-fetch-size​​​​maxFetchSize​​​​MessageSource​

从版本 5.1 开始,可以为同步器提供 . 这在限制使用 获取的文件数时很有用。​​Comparator<FTPFile>​​​​maxFetchSize​

FTP 出站通道适配器

FTP 出站通道适配器依赖于连接到 FTP 服务器的实现,并为传入消息的有效负载中接收的每个文件启动 FTP 传输。 它还支持文件的多种表示形式,因此您不仅限于 -类型化有效负载。 FTP 出站通道适配器支持以下有效负载:​​MessageHandler​​​​java.io.File​

  • ​java.io.File​​:实际文件对象
  • ​byte[]​​:表示文件内容的字节数组
  • ​java.lang.String​​:表示文件内容的文本
  • ​java.io.InputStream​​:要传输到远程文件的数据流
  • ​org.springframework.core.io.Resource​​:用于将数据传输到远程文件的资源

以下示例显示如何配置:​​outbound-channel-adapter​

<int-ftp:outbound-channel-adapter 
channel="ftpChannel"
session-factory="ftpSessionFactory"
charset="UTF-8"
remote-file-separator="/"
auto-create-directory="true"
remote-directory-expression="headers['remote_dir']"
temporary-remote-directory-expression="headers['temp_remote_dir']"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>

前面的配置显示了如何使用该元素配置 FTP 出站通道适配器,同时还为各种属性提供值,例如(策略接口的实现)、对 的引用和其他属性。 您还可以看到一些属性示例,这些属性允许您使用 SpEL 配置设置,例如 、 和(前面示例中所示的 SpEL 替代项)。 与任何允许使用 SpEL 的组件一样,可以通过“有效负载”和“标头”变量访问有效负载和消息标头。 有关可用属性的更多详细信息,请参阅架构。​​outbound-channel-adapter​​​​filename-generator​​​​o.s.i.file.FileNameGenerator​​​​session-factory​​​​*expression​​​​remote-directory-expression​​​​temporary-remote-directory-expression​​​​remote-filename-generator-expression​​​​filename-generator​

默认情况下,如果未指定文件名生成器,Spring 集成将使用 . 根据 中的标头(如果存在)的值确定文件名,或者,如果消息的有效负载已经是 ,则它使用该文件的原始名称。​​o.s.i.file.DefaultFileNameGenerator​​​​DefaultFileNameGenerator​​​​file_name​​​​MessageHeaders​​​​java.io.File​

定义某些值(如 )可能取决于平台或 FTP 服务器。 例如,正如 https://forum.spring.io/showthread.php?p=333478&posted=1#post333478​ 所报告的那样,在某些平台上,必须在目录定义的末尾添加斜杠(例如,而不是 )。​​remote-directory​​​​remote-directory="/thing1/thing2/"​​​​remote-directory="/thing1/thing2"​

从版本 4.1 开始,您可以在传输文件时指定 。 默认情况下,现有文件将被覆盖。 模式由枚举定义,枚举包括以下值:​​mode​​​​FileExistsMode​

  • ​REPLACE​​(默认)
  • ​REPLACE_IF_MODIFIED​
  • ​APPEND​
  • ​APPEND_NO_FLUSH​
  • ​IGNORE​
  • ​FAIL​

​IGNORE​​并且不要传输文件。 导致引发异常,同时以静默方式忽略传输(尽管生成了日志条目)。​​FAIL​​​​FAIL​​​​IGNORE​​​​DEBUG​

版本 5.2 引入了该属性,您可以使用该属性在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者读写)。 使用 java 配置适配器时,可以使用 或 。 仅当 FTP 服务器支持子命令时才适用。​​chmod​​​​600​​​​setChmodOctal("600")​​​​setChmod(0600)​​​​SITE CHMOD​

避免部分写入的文件

处理文件传输时出现的常见问题之一是处理部分文件的可能性。 也就是说,文件可能会在传输实际完成之前出现在文件系统中。

为了解决这个问题,Spring 集成 FTP 适配器使用一种通用算法:文件以临时名称传输,然后在完全传输后重命名。

默认情况下,每个正在传输的文件都会显示在文件系统中,并带有一个附加后缀,默认情况下为 . 您可以通过设置属性来更改此后缀。​​.writing​​​​temporary-file-suffix​

但是,在某些情况下,您可能不想使用此技术(例如,如果服务器不允许重命名文件)。 对于此类情况,可以通过设置为 (默认值为 ) 来禁用此功能。 当此属性为 时,文件将以其最终名称写入,并且使用应用程序需要某种其他机制来检测文件在访问之前是否已完全上传。​​use-temporary-file-name​​​​false​​​​true​​​​false​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}

@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}

@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {

@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}

});
return handler;
}

@MessagingGateway
public interface MyGateway {

@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);

}
}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}

@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}

@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlow.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}

@MessagingGateway
public interface MyGateway {

@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);

}

}

FTP 出站网关

FTP 出站网关提供一组有限的命令来与远程 FTP 或 FTPS 服务器进行交互。 支持的命令包括:

  • ​ls​​(列出文件)
  • ​nlst​​(列出文件名)
  • ​get​​(检索文件)
  • ​mget​​(检索文件)
  • ​rm​​(删除文件)
  • ​mv​​(移动/重命名文件)
  • ​put​​(发送文件)
  • ​mput​​(发送多个文件)

使用命令​​ls​

​ls​​列出远程文件并支持以下选项:

  • ​-1​​:检索文件名列表。 默认值是检索对象列表。FileInfo
  • ​-a​​:包括所有文件(包括以“.”开头的文件)
  • ​-f​​:不对列表进行排序
  • ​-dirs​​:包含目录(默认情况下排除它们)
  • ​-links​​:包含符号链接(默认情况下将其排除在外)
  • ​-R​​:递归列出远程目录

此外,还提供了文件名筛选,其方式与 . 请参阅 FTP 入站通道适配器。​​inbound-channel-adapter​

操作生成的消息负载是文件名列表或对象列表。 这些对象提供修改时间、权限和其他详细信息等信息。​​ls​​​​FileInfo​

命令操作的远程目录在标头中提供。​​ls​​​​file_remoteDirectory​

使用递归选项 () 时,包括任何子目录元素,表示文件的相对路径(相对于远程目录)。 如果包含该选项,则每个递归目录也会作为列表中的元素返回。 在这种情况下,建议您不要使用该选项,因为您将无法区分文件和目录,而您可以对对象进行区分。​​-R​​​​fileName​​​​-dirs​​​​-1​​​​FileInfo​

从版本 4.3 开始,对 和 方法的支持。 因此,您可以省略该属性。 为方便起见,Java 配置有两个没有参数的构造函数。 根据 FTP 协议,或 、 和命令被视为客户端工作目录。 必须提供所有其他命令,以根据请求消息评估远程路径。 您可以在扩展和实现回调时使用该函数设置工作目录。​​FtpSession​​​​null​​​​list()​​​​listNames()​​​​expression​​​​expression​​​​LS​​​​NLST​​​​PUT​​​​MPUT​​​​null​​​​expression​​​​FTPClient.changeWorkingDirectory()​​​​DefaultFtpSessionFactory​​​​postProcessClientAfterConnect()​

使用命令​​nlst​

版本 5 引入了对该命令的支持。​​nlst​

​nlst​​列出远程文件名,仅支持一个选项:

  • ​-f​​:不对列表进行排序

操作生成的消息负载是文件名列表。​​nlst​

命令操作的远程目录在标头中提供。​​nlst​​​​file_remoteDirectory​

与使用该命令的 ​​ls​​ 命令的选项不同,该命令将命令发送到目标 FTP 服务器。 当服务器不支持(例如,由于安全限制)时,此命令很有用。 操作的结果是没有其他详细信息的名称。 因此,框架无法确定实体是否为目录,例如执行筛选或递归列表。​​-1​​​​LIST​​​​nlst​​​​NLST​​​​LIST​​​​nlst​

使用命令​​get​

​get​​检索远程文件。 它支持以下选项:

  • ​-P​​:保留远程文件的时间戳。
  • ​-stream​​:以流形式检索远程文件。
  • ​-D​​:传输成功后删除远程文件。 如果忽略传输,则不会删除远程文件,因为 和本地文件已存在。FileExistsModeIGNORE

标头提供远程目录名称,标头提供文件名。​​file_remoteDirectory​​​​file_remoteFile​

操作生成的消息负载是表示检索到的文件的对象,或者是使用该选项时的对象。 该选项允许将文件作为流检索。 对于文本文件,常见的用例是将此操作与文件拆分器或流转换器结合使用。 将远程文件作为流使用时,您负责在使用流后关闭 。 为方便起见,标头中提供了 ,您可以使用 方便的方法访问 下面的示例演示如何使用便利方法:​​get​​​​File​​​​InputStream​​​​-stream​​​​-stream​​​​Session​​​​Session​​​​closeableResource​​​​IntegrationMessageHeaderAccessor​

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}

文件拆分器和流转换器等框架组件在传输数据后自动关闭会话。

以下示例演示如何将文件用作流:

<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />

如果在自定义组件中使用输入流,则必须关闭 . 可以在自定义代码中执行此操作,也可以通过将消息副本路由到 并使用 SpEL 来执行此操作,如以下示例所示:​​Session​​​​service-activator​

<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />