Maple's Blog.

[Untitled Post]

字数统计: 440阅读时长: 2 min
2023/06/12

aligo 实现类似 os.walk 的功能。

实测并不实用,仅当玩具。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from aligo import Aligo, Auth, GetShareFileListRequest

Auth._EMAIL_HOST = ''
Auth._EMAIL_PORT = '465'
Auth._EMAIL_USER = ''
Auth._EMAIL_PASSWORD = ''


class WmAligo(Aligo):
def __init__(self, **options):
super().__init__(**options)

def _walk(self, source_dirs, root_file_data):
result_files = []
result_dirs = []

if not root_file_data.get('file_id'):
files = self.get_file_list()
else:
files = self.get_file_list(root_file_data.get('file_id'))

for file in files:
if file.type == "folder":
result_dirs.append(file)
source_dirs.append({
'file_id': file.file_id,
'path': root_file_data.get('path') + file.name + '/'
})
continue

if file.type == "file":
result_files.append(file)

return [root_file_data.get('file_id'), result_files, result_dirs, root_file_data.get('path')]

def _walk_share(self, root_share_token, source_dirs, root_file_data):
result_files = []
result_dirs = []

files = self.get_share_file_list_by_share_id(parent_file_id=root_file_data.get('file_id'),
share_token=root_share_token)

for file in files:
if file.type == "folder":
result_dirs.append(file)
source_dirs.append({
'file_id': file.file_id,
'path': root_file_data.get('path') + file.name + '/'
})
continue

if file.type == "file":
result_files.append(file)

return [root_share_token, root_file_data.get('file_id'), result_files, result_dirs, root_file_data.get('path')]

def walk(self, file_id=None):
if file_id:
dir_file = self.get_file(file_id)
if not dir_file:
raise Exception("file_id: {} 目录不存在".format(file_id))

if dir_file.type != "folder":
raise Exception("file_id: {} 文件非目录".format(file_id))
source_dirs = [
{
'file_id': file_id,
'path': '/'
}
]
else:
source_dirs = [
{
'file_id': None,
'path': '/'
}
]

index = 0

while len(source_dirs) > index:
dir_file_id = source_dirs[index]
yield self._walk(source_dirs, dir_file_id)
index = index + 1

def get_share_file_list_by_share_id(self, parent_file_id=None, share_id=None, share_token=None):

if not share_token and share_id:
share_token = self.get_share_token(share_id)

if not share_token:
raise Exception("缺失 share_token")

if parent_file_id:
body = GetShareFileListRequest(share_id=share_token.share_id, parent_file_id=parent_file_id)
return self.get_share_file_list(body=body, parent_file_id=parent_file_id, share_token=share_token)

body = GetShareFileListRequest(share_id=share_token.share_id)
return self.get_share_file_list(body=body, share_token=share_token)

def walk_share(self, share_id, file_id=None):

index = 0

source_dirs = [
{
'file_id': file_id,
'path': '/'
}
]
share_token_root = self.get_share_token(share_id)

if not share_token_root:
raise Exception("获取 token root 失败")

while len(source_dirs) > index:
dir_file_data = source_dirs[index]

result = self._walk_share(share_token_root, source_dirs, dir_file_data)

yield result
index = index + 1

def get_download_url_str(self, file=None, file_id=None):
if file and not file_id:
file_id = file.file_id

dl_data = self.get_download_url(file_id=file_id)
return dl_data.url

CATALOG