为气流中的日志设置s3

时间:2021-11-01 02:17:25

I am using docker-compose to set up a scalable airflow cluster. I based my approach off of this Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

我正在使用docker-compose来设置可扩展的气流群集。我的解决方案基于这个Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

My problem is getting the logs set up to write/read from s3. When a dag has completed I get an error like this

我的问题是将日志设置为从s3写入/读取。当dag完成后,我得到这样的错误

*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.

*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00

I set up a new section in the airflow.cfg file like this

我像这样在airflow.cfg文件中设置了一个新的部分

[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx

And then specified the s3 path in the remote logs section in airflow.cfg

然后在airflow.cfg中的远程日志部分中指定s3路径

remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn

Did I set this up properly and there is a bug? Is there a recipe for success here that I am missing?

我是否正确设置了这个并且有错误?这里有成功的秘诀吗?

-- Update

I tried exporting in URI and JSON formats and neither seemed to work. I then exported the aws_access_key_id and aws_secret_access_key and then airflow started picking it up. Now I get his error in the worker logs

我尝试以URI和JSON格式导出,似乎都不起作用。然后我导出了aws_access_key_id和aws_secret_access_key,然后气流开始捡起它。现在我在工作日志中得到了他的错误

6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00

-- Update

I found this link as well https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

我也找到了这个链接https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

I then shelled into one of my worker machines (separate from the webserver and scheduler) and ran this bit of code in python

然后,我进入我的一个工作机器(与Web服务器和调度程序分开)并在python中运行这段代码

import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))

I receive this error.

我收到此错误。

boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden

I tried exporting several different types of AIRFLOW_CONN_ envs as explained here in the connections section https://airflow.incubator.apache.org/concepts.html and by other answers to this question.

我尝试导出几个不同类型的AIRFLOW_CONN_ envs,如连接部分https://airflow.incubator.apache.org/concepts.html中所述,以及此问题的其他答案。

s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3

{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}

{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}

I have also exported AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with no success.

我还导出了AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY但没有成功。

These credentials are being stored in a database so once I add them in the UI they should be picked up by the workers but they are not able to write/read logs for some reason.

这些凭据存储在数据库中,因此一旦我在UI中添加它们,它们就应该被工作人员选中,但由于某些原因它们无法写入/读取日志。

5 个解决方案

#1


13  

You need to set up the s3 connection through airflow UI. For this, you need to go to the Admin -> Connections tab on airflow UI and create a new row for your S3 connection.

您需要通过气流UI设置s3连接。为此,您需要转到气流UI上的Admin - > Connections选项卡,并为S3连接创建一个新行。

An example configuration would be:

一个示例配置是:

Conn Id: my_conn_S3

Conn Id:my_conn_S3

Conn Type: S3

Conn类型:S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

额外:{“aws_access_key_id”:“your_aws_key_id”,“aws_secret_access_key”:“your_aws_secret_key”}

#2


22  

NOTE: As of Airflow 1.9.0 remote logging has been significantly altered. There are plans to make logging easier in future - e.g. autodetect cloud provider from a bucket string. These changes are not live yet, but keep a close eye on releases. If you are using 1.9.0, read on.

注意:从Airflow 1.9.0开始,远程日志记录已经发生了重大变化。有计划在将来更容易记录日志 - 例如从存储桶字符串中自动检测云提供程序。这些变化还没有实现,但要密切注意发布。如果您使用的是1.9.0,请继续阅读。

Reference here

Complete Instructions:

  1. Create a directory to store configs and place this so that it can be found in PYTHONPATH. One example is $AIRFLOW_HOME/config

    创建一个目录来存储配置并放置它,以便可以在PYTHONPATH中找到它。一个例子是$ AIRFLOW_HOME / config

  2. Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py

    创建名为$ AIRFLOW_HOME / config / log_config.py和$ AIRFLOW_HOME / config / __ init__.py的空文件

  3. Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.

    将airflow / config_templates / airflow_local_settings.py的内容复制到刚刚在上面的步骤中创建的log_config.py文件中。

  4. Customize the following portions of the template:

    自定义模板的以下部分:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. Make sure a s3 connection hook has been defined in Airflow, as per the above answer. The hook should have read and write access to the s3 bucket defined above in S3_LOG_FOLDER.

    根据上述答案,确保在Airflow中定义了s3连接挂钩。钩子应具有对S3_LOG_FOLDER中上面定义的s3桶的读写访问权限。

  6. Update $AIRFLOW_HOME/airflow.cfg to contain:

    更新$ AIRFLOW_HOME / airflow.cfg以包含:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.

    重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行。

  8. Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.

    验证日志是否显示在您定义的存储桶中新执行的任务中。

  9. Verify that the s3 storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:

    验证s3存储查看器是否在UI中正常工作。拉出新执行的任务,并验证您是否看到类似的内容:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    

#3


4  

Here's a solution if you don't want to use the admin UI.

如果您不想使用管理界面,这是一个解决方案。

My deployment process is Dockerized, and I never touch the admin UI. I also like setting Airflow-specific environment variables in a bash script, which overrides the .cfg file.

我的部署过程是Dockerized,我从不接触管理UI。我还喜欢在bash脚本中设置特定于Airflow的环境变量,该脚本会覆盖.cfg文件。

airflow[s3]

First of all, you need the s3 subpackage installed to write your Airflow logs to S3. (boto3 works fine for the Python jobs within your DAGs, but the S3Hook depends on the s3 subpackage.)

首先,您需要安装s3子包才能将Airflow日志写入S3。 (boto3适用于DAG中的Python作业,但S3Hook依赖于s3子包。)

One more side note: conda install doesn't handle this yet, so I have to do pip install airflow[s3].

还有一个注意事项:conda install还没有处理这个,所以我必须做pip install airflow [s3]。

Environment variables

In a bash script, I set these core variables. Starting from these instructions but using the naming convention AIRFLOW__{SECTION}__{KEY} for environment variables, I do:

在bash脚本中,我设置了这些核心变量。从这些说明开始,但对环境变量使用命名约定AIRFLOW __ {SECTION} __ {KEY},我这样做:

export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 connection ID

S3连接ID

s3_uri is a connection ID that I made up. In Airflow, it corresponds to another environment variable, AIRFLOW_CONN_S3_URI. The value of that is your S3 path, which has to be in URI form. That's

s3_uri是我编写的连接ID。在Airflow中,它对应于另一个环境变量AIRFLOW_CONN_S3_URI。它的值是您的S3路径,必须采用URI形式。那是

s3://access_key:secret_key@bucket/key

Store this however you handle other sensitive environment variables.

存储此但是您处理其他敏感环境变量。

With this configuration, Airflow will write your logs to S3. They will follow the path of s3://bucket/key/dag/task_id.

使用此配置,Airflow会将您的日志写入S3。它们将遵循s3:// bucket / key / dag / task_id的路径。

#4


1  

Just a side note to anyone following the very useful instructions in the above answer: If you stumble upon this issue: "ModuleNotFoundError: No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'" as referenced here (which happens when using airflow 1.9), the fix is simple - use rather this base template: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (and follow all other instructions in the above answer)

对于遵循上述答案中非常有用的说明的任何人,只需注意:如果您偶然发现此问题:“ModuleNotFoundError:此处未引用名为'airflow.utils.log.logging_mixin.RedirectStdHandler'的模块”(使用气流时会发生这种情况) 1.9),修复很简单 - 使用这个基本模板:https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(并遵循所有其他说明)以上答案)

The current template incubator-airflow/airflow/config_templates/airflow_local_settings.py present in master branch contains a reference to the class "airflow.utils.log.s3_task_handler.S3TaskHandler", which is not present in apache-airflow==1.9.0 python package. Hope this helps!

master分支中存在的当前模板incubator-airflow / airflow / config_templates / airflow_local_settings.py包含对类“airflow.utils.log.s3_task_handler.S3TaskHandler”的引用,该类在apache-airflow == 1.9.0 python中不存在包。希望这可以帮助!

#5


0  

To complete Arne's answer with the recent Airflow updates, you do not need to set task_log_reader to another value than the default one : task

要使用最近的Airflow更新完成Arne的答案,您不需要将task_log_reader设置为另一个值而不是默认值:task

As if you follow the default logging template airflow/config_templates/airflow_local_settings.py you can see since this commit (note the handler's name changed to's3': {'task'... instead of s3.task) that's the value on the remote folder(REMOTE_BASE_LOG_FOLDER) will replace the handler with the right one:

好像你按照默认的日志记录模板airflow / config_templates / airflow_local_settings.py你可以看到自提交以来(注意处理程序的名称更改为'3':{'task'...而不是s3.task)这是该值的值远程文件夹(REMOTE_BASE_LOG_FOLDER)将使用正确的文件夹替换处理程序:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

More details on how to log to/read from S3 : https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

有关如何从S3登录/读取的更多详细信息:https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

#1


13  

You need to set up the s3 connection through airflow UI. For this, you need to go to the Admin -> Connections tab on airflow UI and create a new row for your S3 connection.

您需要通过气流UI设置s3连接。为此,您需要转到气流UI上的Admin - > Connections选项卡,并为S3连接创建一个新行。

An example configuration would be:

一个示例配置是:

Conn Id: my_conn_S3

Conn Id:my_conn_S3

Conn Type: S3

Conn类型:S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

额外:{“aws_access_key_id”:“your_aws_key_id”,“aws_secret_access_key”:“your_aws_secret_key”}

#2


22  

NOTE: As of Airflow 1.9.0 remote logging has been significantly altered. There are plans to make logging easier in future - e.g. autodetect cloud provider from a bucket string. These changes are not live yet, but keep a close eye on releases. If you are using 1.9.0, read on.

注意:从Airflow 1.9.0开始,远程日志记录已经发生了重大变化。有计划在将来更容易记录日志 - 例如从存储桶字符串中自动检测云提供程序。这些变化还没有实现,但要密切注意发布。如果您使用的是1.9.0,请继续阅读。

Reference here

Complete Instructions:

  1. Create a directory to store configs and place this so that it can be found in PYTHONPATH. One example is $AIRFLOW_HOME/config

    创建一个目录来存储配置并放置它,以便可以在PYTHONPATH中找到它。一个例子是$ AIRFLOW_HOME / config

  2. Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py

    创建名为$ AIRFLOW_HOME / config / log_config.py和$ AIRFLOW_HOME / config / __ init__.py的空文件

  3. Copy the contents of airflow/config_templates/airflow_local_settings.py into the log_config.py file that was just created in the step above.

    将airflow / config_templates / airflow_local_settings.py的内容复制到刚刚在上面的步骤中创建的log_config.py文件中。

  4. Customize the following portions of the template:

    自定义模板的以下部分:

    #Add this variable to the top of the file. Note the trailing slash.
    S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/'
    
    Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG
    LOGGING_CONFIG = ...
    
    Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable
    's3.task': {
        'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
        'formatter': 'airflow.task',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        's3_log_folder': S3_LOG_FOLDER,
        'filename_template': FILENAME_TEMPLATE,
    },
    
     Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'.
    'loggers': {
        'airflow.task': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            ...
        },
        'airflow': {
            'handlers': ['console'],
            ...
        },
    }
    
  5. Make sure a s3 connection hook has been defined in Airflow, as per the above answer. The hook should have read and write access to the s3 bucket defined above in S3_LOG_FOLDER.

    根据上述答案,确保在Airflow中定义了s3连接挂钩。钩子应具有对S3_LOG_FOLDER中上面定义的s3桶的读写访问权限。

  6. Update $AIRFLOW_HOME/airflow.cfg to contain:

    更新$ AIRFLOW_HOME / airflow.cfg以包含:

    task_log_reader = s3.task
    logging_config_class = log_config.LOGGING_CONFIG
    remote_log_conn_id = <name of the s3 platform hook>
    
  7. Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.

    重新启动Airflow网络服务器和调度程序,并触发(或等待)新任务执行。

  8. Verify that logs are showing up for newly executed tasks in the bucket you’ve defined.

    验证日志是否显示在您定义的存储桶中新执行的任务中。

  9. Verify that the s3 storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:

    验证s3存储查看器是否在UI中正常工作。拉出新执行的任务,并验证您是否看到类似的内容:

    *** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log.
    [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532
    [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py']
    [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
    

#3


4  

Here's a solution if you don't want to use the admin UI.

如果您不想使用管理界面,这是一个解决方案。

My deployment process is Dockerized, and I never touch the admin UI. I also like setting Airflow-specific environment variables in a bash script, which overrides the .cfg file.

我的部署过程是Dockerized,我从不接触管理UI。我还喜欢在bash脚本中设置特定于Airflow的环境变量,该脚本会覆盖.cfg文件。

airflow[s3]

First of all, you need the s3 subpackage installed to write your Airflow logs to S3. (boto3 works fine for the Python jobs within your DAGs, but the S3Hook depends on the s3 subpackage.)

首先,您需要安装s3子包才能将Airflow日志写入S3。 (boto3适用于DAG中的Python作业,但S3Hook依赖于s3子包。)

One more side note: conda install doesn't handle this yet, so I have to do pip install airflow[s3].

还有一个注意事项:conda install还没有处理这个,所以我必须做pip install airflow [s3]。

Environment variables

In a bash script, I set these core variables. Starting from these instructions but using the naming convention AIRFLOW__{SECTION}__{KEY} for environment variables, I do:

在bash脚本中,我设置了这些核心变量。从这些说明开始,但对环境变量使用命名约定AIRFLOW __ {SECTION} __ {KEY},我这样做:

export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

S3 connection ID

S3连接ID

s3_uri is a connection ID that I made up. In Airflow, it corresponds to another environment variable, AIRFLOW_CONN_S3_URI. The value of that is your S3 path, which has to be in URI form. That's

s3_uri是我编写的连接ID。在Airflow中,它对应于另一个环境变量AIRFLOW_CONN_S3_URI。它的值是您的S3路径,必须采用URI形式。那是

s3://access_key:secret_key@bucket/key

Store this however you handle other sensitive environment variables.

存储此但是您处理其他敏感环境变量。

With this configuration, Airflow will write your logs to S3. They will follow the path of s3://bucket/key/dag/task_id.

使用此配置,Airflow会将您的日志写入S3。它们将遵循s3:// bucket / key / dag / task_id的路径。

#4


1  

Just a side note to anyone following the very useful instructions in the above answer: If you stumble upon this issue: "ModuleNotFoundError: No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'" as referenced here (which happens when using airflow 1.9), the fix is simple - use rather this base template: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (and follow all other instructions in the above answer)

对于遵循上述答案中非常有用的说明的任何人,只需注意:如果您偶然发现此问题:“ModuleNotFoundError:此处未引用名为'airflow.utils.log.logging_mixin.RedirectStdHandler'的模块”(使用气流时会发生这种情况) 1.9),修复很简单 - 使用这个基本模板:https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(并遵循所有其他说明)以上答案)

The current template incubator-airflow/airflow/config_templates/airflow_local_settings.py present in master branch contains a reference to the class "airflow.utils.log.s3_task_handler.S3TaskHandler", which is not present in apache-airflow==1.9.0 python package. Hope this helps!

master分支中存在的当前模板incubator-airflow / airflow / config_templates / airflow_local_settings.py包含对类“airflow.utils.log.s3_task_handler.S3TaskHandler”的引用,该类在apache-airflow == 1.9.0 python中不存在包。希望这可以帮助!

#5


0  

To complete Arne's answer with the recent Airflow updates, you do not need to set task_log_reader to another value than the default one : task

要使用最近的Airflow更新完成Arne的答案,您不需要将task_log_reader设置为另一个值而不是默认值:task

As if you follow the default logging template airflow/config_templates/airflow_local_settings.py you can see since this commit (note the handler's name changed to's3': {'task'... instead of s3.task) that's the value on the remote folder(REMOTE_BASE_LOG_FOLDER) will replace the handler with the right one:

好像你按照默认的日志记录模板airflow / config_templates / airflow_local_settings.py你可以看到自提交以来(注意处理程序的名称更改为'3':{'task'...而不是s3.task)这是该值的值远程文件夹(REMOTE_BASE_LOG_FOLDER)将使用正确的文件夹替换处理程序:

REMOTE_LOGGING = conf.get('core', 'remote_logging')

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

More details on how to log to/read from S3 : https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

有关如何从S3登录/读取的更多详细信息:https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3