1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| from aligo import Aligo, Auth, GetShareFileListRequest
Auth._EMAIL_HOST = '' Auth._EMAIL_PORT = '465' Auth._EMAIL_USER = '' Auth._EMAIL_PASSWORD = ''
class WmAligo(Aligo): def __init__(self, **options): super().__init__(**options)
def _walk(self, source_dirs, root_file_data): result_files = [] result_dirs = []
if not root_file_data.get('file_id'): files = self.get_file_list() else: files = self.get_file_list(root_file_data.get('file_id'))
for file in files: if file.type == "folder": result_dirs.append(file) source_dirs.append({ 'file_id': file.file_id, 'path': root_file_data.get('path') + file.name + '/' }) continue
if file.type == "file": result_files.append(file)
return [root_file_data.get('file_id'), result_files, result_dirs, root_file_data.get('path')]
def _walk_share(self, root_share_token, source_dirs, root_file_data): result_files = [] result_dirs = []
files = self.get_share_file_list_by_share_id(parent_file_id=root_file_data.get('file_id'), share_token=root_share_token)
for file in files: if file.type == "folder": result_dirs.append(file) source_dirs.append({ 'file_id': file.file_id, 'path': root_file_data.get('path') + file.name + '/' }) continue
if file.type == "file": result_files.append(file)
return [root_share_token, root_file_data.get('file_id'), result_files, result_dirs, root_file_data.get('path')]
def walk(self, file_id=None): if file_id: dir_file = self.get_file(file_id) if not dir_file: raise Exception("file_id: {} 目录不存在".format(file_id))
if dir_file.type != "folder": raise Exception("file_id: {} 文件非目录".format(file_id)) source_dirs = [ { 'file_id': file_id, 'path': '/' } ] else: source_dirs = [ { 'file_id': None, 'path': '/' } ]
index = 0
while len(source_dirs) > index: dir_file_id = source_dirs[index] yield self._walk(source_dirs, dir_file_id) index = index + 1
def get_share_file_list_by_share_id(self, parent_file_id=None, share_id=None, share_token=None):
if not share_token and share_id: share_token = self.get_share_token(share_id)
if not share_token: raise Exception("缺失 share_token")
if parent_file_id: body = GetShareFileListRequest(share_id=share_token.share_id, parent_file_id=parent_file_id) return self.get_share_file_list(body=body, parent_file_id=parent_file_id, share_token=share_token)
body = GetShareFileListRequest(share_id=share_token.share_id) return self.get_share_file_list(body=body, share_token=share_token)
def walk_share(self, share_id, file_id=None):
index = 0
source_dirs = [ { 'file_id': file_id, 'path': '/' } ] share_token_root = self.get_share_token(share_id)
if not share_token_root: raise Exception("获取 token root 失败")
while len(source_dirs) > index: dir_file_data = source_dirs[index]
result = self._walk_share(share_token_root, source_dirs, dir_file_data)
yield result index = index + 1
def get_download_url_str(self, file=None, file_id=None): if file and not file_id: file_id = file.file_id
dl_data = self.get_download_url(file_id=file_id) return dl_data.url
|