大量小文件的实时同步方案(python…

时间:2022-11-11 19:56:24



传统的文件同步方案有rsync(单向) 和 unison(双向)等,它们需要扫描所有文件后进行比对,差量传输。如果文件数量达到了百万甚至千万量级,扫描所有文件将非常耗时。而且正在发生变化的往往是其中很少的一部分,这是非常低效的方式。

之前看了​​Amazon的Dynamo的设计文档​​,它们每个节点的数据是通过Hash Tree来实现同步,既有通过日志来同步的软实时特点(msyql, bdb等),也可以保证最终数据的一致性(rsync, unison等)。Hash Tree的大体思路是将所有数据存储成树状结构,每个节点的Hash是其所有子节点的Hash的Hash,叶子节点的Hash是其内容的Hash。这样一旦某个节点发生变化,其Hash的变化会迅速传播到根节点。需要同步的系统只需要不断查询跟节点的hash,一旦有变化,顺着树状结构就能够在logN级别的时间找到发生变化的内容,马上同步。

文件系统天然的是树状结构,尽管不是平衡的数。如果文件的修改时间是可靠的,可以表征文件的变化,那就可以用它作为文件的Hash值。另一方面,文件的修改通常是按顺序执行的,后修改的文件比早修改的文件具有更大的修改时间,这样就可以把一个目录内的最大修改时间作为它的修改时间,以实现Hash Tree。这样,一旦某个文件被修改,修改时间的信息就会迅速传播到根目录。

一般的文件系统都不是这样做的,目录的修改时间表示的是目录结构最后发生变化的时间,不包括子目录,否则会不堪重负。因为我们需要自己实现这个功能,利用Linux 2.6内核的新特性inotify获得某个目录内文件发生变化的信息,并把其修改时间传播到它的上级目录(以及再上级目录)。Python 有 ​​pyinotify​​,watch.py的代码如下:

    1. #!/usr/bin/python   
    2.
    3. from pyinotify import *
    4. import os, os.path
    5.
    6. flags = IN_CLOSE_WRITE|IN_CREATE|IN_Q_OVERFLOW
    7. dirs = {}
    8. base = '/log/lighttpd/cache/images/icon/u241'
    9. base = 'tmp'
    10.
    11. class UpdateParentDir(ProcessEvent):
    12. def process_IN_CLOSE_WRITE(self, event):
    13. print 'modify', event.pathname
    14. mtime = os.path.getmtime(event.pathname)
    15. p = event.path
    16. while p.startswith(base):
    17. m = os.path.getmtime(p)
    18. if m < mtime:
    19. print 'update', p
    20. os.utime(p, (mtime,mtime))
    21. elif m > mtime:
    22. mtime = m
    23. p = os.path.dirname(p)
    24.
    25. process_IN_MODIFY = process_IN_CLOSE_WRITE
    26.
    27. def process_IN_Q_OVERFLOW(self, event):
    28. print 'over flow'
    29. max_queued_events.value *= 2
    30.
    31. def process_default(self, event):
    32. pass
    33.
    34. wm = WatchManager()
    35. notifier = Notifier(wm, UpdateParentDir())
    36. dirs.update(wm.add_watch(base, flags, rec=True, auto_add=True))
    37.
    38.
    在已经有Hash Tree的时候,同步就比较简单了,不停地获取根目录的修改时间并顺着目录结构往下找即可。需要注意的是,在更新完文件后,需要设置修改时间为原文件的修改时间,目录也是,保证Hash Tree的一致性,否则没法同步。mirror.py的代码如下
    1. #!/usr/bin/python
    2.
    3. import sys,time,re,urllib
    4. import os,os.path
    5. from os.path import exists, isdir, getmtime
    6.
    7. src = sys.argv[1]
    8. dst = sys.argv[2]
    9.
    10. def local_mirror(src, dst):
    11. if exists(dst) and mtime == getmtime(dst):
    12. return
    13. if not isdir(src):
    14. print 'update:', dst
    15. open(dst,'wb').write(open(src).read())
    16. else:
    17. if not exists(dst):
    18. os.makedirs(dst)
    19. for filename in os.listdir(src):
    20. local_mirror(os.path.join(src,filename), os.path.join(dst,filename))
    21. os.utime(dst, (mtime,mtime))
    22.
    23. def get_info(path):
    24. f = urllib.urlopen(path)
    25. mtime = f.headers.get('Last-Modified')
    26. if mtime:
    27. mtime = time.mktime(time.strptime(mtime, '%a, %d %b %Y %H:%M:%S %Z'))
    28. content = f.read()
    29. f.close()
    30. return int(mtime), content
    31.
    32. p = re.compile(r'([\d.]+?) +([\w/]+)')
    33.
    34. def remote_mirror(src, dst):
    35. mtime, content = get_info(src)
    36. if exists(dst) and mtime == int(getmtime(dst)):
    37. return
    38. print 'update:', dst, src
    39. if not src.endswith('/'):
    40. open(dst,'wb').write(content)
    41. else:
    42. if not exists(dst):
    43. os.makedirs(dst)
    44. for mt,filename in p.findall(content):
    45. mt = int(float(mt))
    46. lpath = dst+filename
    47. if not exists(lpath) or int(getmtime(lpath)) != mt:
    48. remote_mirror(src+filename, lpath)
    49. os.utime(dst, (mtime,mtime))
    50.
    51. if src.startswith('http://'):
    52. mirror = remote_mirror
    53. else:
    54. mirror = local_mirror
    55.
    56. while True:
    57. mirror(src, dst)
    58. time.sleep(1)
    如果源文件不在同一台机器上,可以通过NFS等共享过来。或者可以通过支持列目录的HTTP服务器来访问远程目录,mirror.py 已经支持这种访问方式。server.py 是用webpy做的一个简单的只是列目录的文件服务器。由于瓶颈在IO上,它的性能不是关键。server.py的代码如下:
    1. #!/usr/bin/python
    2.
    3. import os,os.path
    4. import web
    5. import time
    6.
    7. root = 'tmp'
    8.
    9. HTTP_HEADER_TIME = '%a, %d %b %Y %H:%M:%S %Z'
    10.
    11. class FileServer:
    12. def GET(self, path):
    13. path = root + path
    14. if not os.path.exists(path):
    15. return 404
    16. mtime = time.localtime(os.path.getmtime(path))
    17. web.header('Last-Modified', time.strftime(HTTP_HEADER_TIME, mtime))
    18. if os.path.isdir(path):
    19. for file in os.listdir(path):
    20. if file.startswith('.'): continue
    21. p = os.path.join(path,file)
    22. m = os.path.getmtime(p)
    23. if os.path.isdir(p):
    24. file += '/'
    25. print m, file
    26. else:
    27. print open(path,'rb').read()
    28.
    29. urls = (
    30. "(/.*)", "FileServer",
    31. )
    32.
    33. if __name__ == '__main__':
    34. web.run(urls, globals())

    为了获得更好性能,以达到更好的实时性,Hash Tree最好是平衡的,比如BTree。如果一个文件发生变化,同步它需要进行的IO操作为N*M,其中N为数的层数,M为每层的文件数目。现在我们N为2,M最大为10000,适当减少它可以获得更好的性能,比如N为4,M为100。在以后创建目录结构时,最好能够考虑这方面的因素。

    之前hongqn推荐过一个​​利用inotify的文件同步方案​​,同步方式类似于mysql和bdb等,由于过于复杂导致不可靠而没有采用。上面这个方案只用了一百多行Python代码就基本解决问题了,是不是很帅?:-)



    ​​#Linux​​