如何在datalab中运行数据流管道中使用谷歌云存储

时间:2023-01-11 23:13:58

We've been running a Python pipeline in datalab that reads image files from a bucket in google cloud storage (importing google.datalab.storage). Originally we were using DirectRunner and this worked fine, but now we're trying to use DataflowRunner, and we're having import errors. Even if we include "import google.datalab.storage" or any variant thereof inside the function run by the pipeline, we get errors such as "No module named 'datalab.storage'". We've also tried using the save_main_session, requirements_file, and setup_file flags with no luck. How would we correctly access image files in cloud storage buckets in a dataflow pipeline?

我们一直在datalab中运行Python管道,从谷歌云存储中的桶中读取图像文件(导入google.datalab.storage)。最初我们使用DirectRunner,这很好,但现在我们尝试使用DataflowRunner,我们有导入错误。即使我们在管道运行的函数中包含“import google.datalab.storage”或其任何变体,我们也会收到错误,例如“没有名为'datalab.storage'的模块”。我们也尝试过使用save_main_session,requirements_file和setup_file标志而没有运气。我们如何在数据流管道中正确访问云存储桶中的图像文件?

EDIT: My original error was due to specifying the requirements_file flag with incorrect syntax (i.e. "--requirements_file ./requirements.txt"). I think I've fixed the syntax there, but now I'm getting a different error. Here's a basic version of the code we're trying to run- we have a pipeline that reads files from a storage bucket in Google Cloud. We have a datalab notebook with a cell containing the following Python code:

编辑:我原来的错误是由于指定了不正确语法的requirements_file标志(即“--requirements_file ./requirements.txt”)。我想我已经修复了那里的语法,但现在我得到了一个不同的错误。这是我们尝试运行的代码的基本版本 - 我们有一个管道,可以从Google Cloud中的存储桶中读取文件。我们有一个带有包含以下Python代码的单元格的datalab笔记本:

import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import StandardOptions
import google.datalab.storage as storage

bucket = "BUCKET_NAME"
shared_bucket = storage.Bucket(bucket)

# Create and set PipelineOptions. 
options = PipelineOptions(flags = ["--requirements_file", "./requirements.txt"])
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "PROJECT_NAME"
google_cloud_options.job_name = 'test-pipeline-requirements'
google_cloud_options.staging_location = 'gs://BUCKET_NAME/binaries'
google_cloud_options.temp_location = 'gs://BUCKET_NAME/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

def read_file(input_tuple):
  filepath = input_tuple[0]
  shared_object = shared_bucket.object(filepath)
  f = shared_object.read_stream()
  # More processing of f's contents
  return input_tuple

# File paths relative to the bucket
input_tuples = [("FILEPATH_1", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

Meanwhile there is a file named "requirements.txt" in the same directory as the notebook, with only the line

同时,在与笔记本相同的目录中有一个名为“requirements.txt”的文件,只有该行

datalab==1.0.1

This code works fine if I use DirectRunner. However, when I use DataflowRunner, I get a CalledProcessError at "p.run()", with stack trace ending with the following:

如果我使用DirectRunner,此代码工作正常。但是,当我使用DataflowRunner时,我在“p.run()”处得到一个CalledProcessError,堆栈跟踪结束于以下内容:

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc in _populate_requirements_cache(requirements_file, cache_dir)
224 '--no-binary', ':all:']
225 logging.info('Executing command: %s', cmd_args)
--> 226 processes.check_call(cmd_args)
227
228

_populate_requirements_cache(requirements_file,cache_dir)中的/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc 224' - no-binary',':all:'] 225 logging .info('执行命令:%s',cmd_args) - > 226 processes.check_call(cmd_args)227 228

/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc in check_call(*args, **kwargs)
38 if force_shell:
39 kwargs['shell'] = True
---> 40 return subprocess.check_call(*args, **kwargs)
41
42

check_call中的/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc(* args,** kwargs)38如果force_shell:39 kwargs ['shell'] = True ---> 40 return subprocess.check_call(* args,** kwargs)41 42

/usr/lib/python2.7/subprocess.pyc in check_call(*popenargs, **kwargs)
538 if cmd is None:
539 cmd = popenargs[0]
--> 540 raise CalledProcessError(retcode, cmd)
541 return 0
542

check_call中的/usr/lib/python2.7/subprocess.pyc(* popenargs,** kwargs)538如果cmd为None:539 cmd = popenargs [0] - > 540引发CalledProcessError(retcode,cmd)541 return 0 542

CalledProcessError: Command '['/usr/bin/python', '-m', 'pip', 'install', '--download', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--no-binary', ':all:']' returned non-zero exit status 1

CalledProcessError:命令'['/ usr / bin / python',' - m','pip','install',' - download','/ tmp / dataflow-requirements-cache',' - r',' ./requirements.txt',' - no-binary',':all:']'返回非零退出状态1

It seems like the "--download" option is deprecated for pip, but that's part of the apache_beam code. I've also tried this with different ways of specifying "requirements.txt", with and without the "--save_main_session" flag, and with and without the "--setup_file" flag, but no dice.

对于pip,似乎不推荐使用“--download”选项,但这是apache_beam代码的一部分。我也尝试了不同的方法来指定“requirements.txt”,有和没有“--save_main_session”标志,有和没有“--setup_file”标志,但没有骰子。

2 个解决方案

#1


3  

The most likely issue is that you need to have Dataflow install the datalab pypi module.

最可能的问题是您需要让Dataflow安装datalab pypi模块。

Typically you would do this by listing "datalab" in the requirements.txt file you upload to Dataflow. See https://cloud.google.com/dataflow/pipelines/dependencies-python

通常,您可以通过在上传到Dataflow的requirements.txt文件中列出“datalab”来完成此操作。请参阅https://cloud.google.com/dataflow/pipelines/dependencies-python

#2


3  

If your only usage of pydatalab is to read from GCS, then I would suggest using Dataflow's gcsio. Code example:

如果您对pydatalab的唯一用法是从GCS读取,那么我建议使用Dataflow的gcsio。代码示例:

def read_file(input_tuple):
  filepath = input_tuple[0]
  with beam.io.gcp.gcsio.GcsIO().open(filepath, 'r') as f:
    # process f content
    pass

# File paths relative to the bucket
input_tuples = [("gs://bucket/file.jpg", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

pydatalab is pretty heavy since it is more of an data exploration library used with Datalab or Jupyter. On the other hand, Dataflow's GCSIO is natively supported in pipeline.

pydatalab相当重要,因为它更像是与Datalab或Jupyter一起使用的数据探索库。另一方面,Dataflow的GCSIO在管道中得到了本地支持。

#1


3  

The most likely issue is that you need to have Dataflow install the datalab pypi module.

最可能的问题是您需要让Dataflow安装datalab pypi模块。

Typically you would do this by listing "datalab" in the requirements.txt file you upload to Dataflow. See https://cloud.google.com/dataflow/pipelines/dependencies-python

通常,您可以通过在上传到Dataflow的requirements.txt文件中列出“datalab”来完成此操作。请参阅https://cloud.google.com/dataflow/pipelines/dependencies-python

#2


3  

If your only usage of pydatalab is to read from GCS, then I would suggest using Dataflow's gcsio. Code example:

如果您对pydatalab的唯一用法是从GCS读取,那么我建议使用Dataflow的gcsio。代码示例:

def read_file(input_tuple):
  filepath = input_tuple[0]
  with beam.io.gcp.gcsio.GcsIO().open(filepath, 'r') as f:
    # process f content
    pass

# File paths relative to the bucket
input_tuples = [("gs://bucket/file.jpg", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

pydatalab is pretty heavy since it is more of an data exploration library used with Datalab or Jupyter. On the other hand, Dataflow's GCSIO is natively supported in pipeline.

pydatalab相当重要,因为它更像是与Datalab或Jupyter一起使用的数据探索库。另一方面,Dataflow的GCSIO在管道中得到了本地支持。