Python|文件自动下载、解压、核对

需求概述

序号 场景概况 需求描述
1 文件数量核验 巡查时间:法定工作日/交易日/自然日
巡查路径:D:\xxx\xxx\YYYYMMDD
校验数量:34
2 文件是否存在 巡查时间:法定工作日/交易日/自然日
巡查路径:D:\data\xxx\sh\YYYYMMDD
校验文件:OFD_123455_YYYYMMDD.TXT
3 文件解压转存 巡查时间:法定工作日/交易日/自然日
来源路径:Z:\YYYYMMDD\XX系列产品合并
目的路径:D:\data\qs\YYYYMMDD
处理方式:从来源路径拷贝压缩包,解压至目的路径
4 邮箱文件接收 处理时间:法定工作日/交易日/自然日
发件人:xx@xx.com,xx@xx.com
文件格式:GZ99001.xls、GZ99002.xls
目的路径:D:\data\gzb\YYYYMMDD+1
处理方式:监控收件人邮箱,若涉及发件人的邮件,则下载目的文件,保存至目的路径。其中若邮件涉及多个日期的文件,仅获取最后一个日期的文件即可。

实现方式

基础参数维护

通过分析,主要涉及文件数量核验、文件归集处理、邮件附件接收三类场景。考虑到后续需求的扩展和调整。涉及处理时间、文件路径、核对数量、提示语等信息以参数的形式进行配置,通过泛微系统做建模查询表并开放给用户维护。

数字员工轮询

文件数量核对:数字员工根据基础参数维护的信息进行巡检,每日巡检结果记录到巡检中间表中,用于后续巡检结果通知。
文件是否存在:数字员工根据基础参数维护的信息进行巡检,每日巡检结果记录到巡检中间表中,用于后续巡检结果通知。
文件解压转存:数字员工根据基础参数维护的信息进行文件检查,若检索到压缩包则复制到目的路径进行文件解压,解压成功后删除复制的压缩包。处理结果记录到巡检中间表中,用于后续巡检结果通知。
邮件文件接收:数字员工根据基础参数维护的信息进行邮件过滤,过滤到目的邮件后进行附件的识别、下载和另存(正常工作日中邮件附件为证券投资基金估值表_YYYYMMDD.xls的格式,若涉及国庆等假期后会出现附件有多个,格式证券投资基金估值表_YYYYMMDD.xls,此时只需要下载最后一个日期的文件,下载时文件名修改为GZ99001.xls)。

校验结果通知

根据数字员工轮询处理结果,同时以微信、钉钉方式进行通知。若微信登录方式异常时,可保证钉钉消息及时通知。建议被提醒人开通钉钉的消息通知。

代码如下

main_fileCheck.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import os
import glob
from loguru import logger
from pyunpack import Archive
import datetime
from workalendar.asia import China
from mssql import closeConn, connectMSsql, executeSQL
from sendMsg import dingTalk
from jictmail import connectEmail
import shutil
import sqlite3
import exchange_calendars as trade_date

# 插入巡查信息记录表
def InsertDatabase(title,querydate,msg,issend,senduser):
insertContent = (title,querydate,msg,issend,senduser)
conn = sqlite3.connect(r'D:\sqlite\xxxx')
c = conn.cursor()
querySql = c.execute("insert into xxxx (title,querydate,msg,issend,senduser) values (?,?,?,?,?)",insertContent)
conn.commit()
conn.close()
logger.info('#######{}日,{}的检查结果已插入数据库#######'.format(querydate,title))

# 钉钉消息发送
def SendDingTalk(today):
# 封装钉钉
agent_id = 'xxxx'
AppKey = 'xxxx'
AppSecret = 'xxxx'
access_token = dingTalk.getAccessToken(AppKey = AppKey,AppSecret= AppSecret)
# 连接数据库查询并发送消息
conn = sqlite3.connect(r'D:\sqlite\xxxx')
c = conn.cursor()
queryUserSql = c.execute("select distinct senduser from xxxx where issend='0'")
userList = []
for row in queryUserSql:
userList.append(row[0])
for a in userList:
# 生成发送内容
queryMsgSql = c.execute("select msg from xxx where querydate='{}' and senduser='{}' and issend='0'".format(today,a))
msg = '{}:'.format(today)
for rowa in queryMsgSql:
msg = msg + '\n+ ' + rowa[0]
sendmsg = '{"msg":{"markdown":{"text":"'+msg+'","title":"提醒"},"msgtype":"markdown"},"to_all_user":"false","agent_id":"'+agent_id+'","userid_list":"'+str(a)+'"} '
result = dingTalk.sendMsg(access_token,sendmsg.encode('utf-8'))
logger.info(result)
c.execute("update xxxx set issend='1' where querydate='{}' and senduser='{}' and issend='0'".format(today,a))
conn.commit()
conn.close()


# 生成当前日期yyyymmdd
def today():
today = datetime.datetime.today()
return today.strftime('%Y%m%d')

# 判断当前日期是否是法定工作日
def is_workday():
cal = China()
return cal.is_working_day(datetime.datetime.today().date())

# 判断当前日期是否是沪市交易日,yyy-mm-dd
def is_tradingday(date):
calendar = trade_date.get_calendar('XSHG')
# 判断是否是交易日
return calendar.is_session(date)

# 日期偏移量处理
# 0 法定工作日;1 自然日;3 交易日;
def new_date(datetype,date,dateadd):
# 交易日偏移量处理
if datetype == 3:
calendar = trade_date.get_calendar('XSHG')
if dateadd < 0 :
traddate = calendar.sessions_window(date,dateadd-1)[0].strftime('%Y-%m-%d')
return datetime.datetime.strptime(traddate,'%Y-%m-%d').date()
else :
traddate = calendar.sessions_window(date,dateadd+1)[-1].strftime('%Y-%m-%d')
return datetime.datetime.strptime(traddate,'%Y-%m-%d').date()
# 自然日
elif datetype == 1:
return datetime.datetime.today().date() + datetime.timedelta(days = dateadd)
# 工作日
elif datetype == 0:
cal = China()
return cal.add_working_days(datetime.datetime.today().date(),dateadd)

# 获取文件夹内的文件数量
def count_files_in_folder(folder_path):
files = glob.glob(os.path.join(folder_path,'*'))
file_count = len(files)
return file_count

# 创建文件夹
def md(path,tp=None):
if os.path.exists( path + '\\' + tp):
pass
else:
os.makedirs( path + '\\' + tp)
return path + '\\' + tp

# 清空文件夹
def rmd(path):
for filename in os.listdir(path):
file_path = os.path.join(path,filename)
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)

# 拷贝文件并重命名
def ReameFile(path,newPath,newName):
# os.rename(path,os.path.join(newPath,newName))
shutil.copy(path,os.path.join(newPath,newName))

# 判断文件或文件夹是否存在
def fileIsExists(path):
if os.path.exists(path):
return True
else:
return False

# 解压文件
def unpack(zipfile,path):
Archive(zipfile).extractall(path)

'''
获取文件夹下最新日期的文件。如文件名为:
证券投资基金_测试1_2023-07-04.txt
证券投资基金_测试1_2023-07-05.txt
证券投资基金_测试1_2023-07-06.txt
证券投资基金_测试2_2023-07-05.txt
getMaxFilename('d:\\20230706','.xls')调用结果为:
['证券投资基金_测试1_2023-07-06.xls', '证券投资基金_测试2_2023-07-05.xls']
'''
def getMaxFilename(folder_path,filetype):
filenameList = []
result_dict = {}
# 文件名获取并转为list
for filename in os.listdir(folder_path):
file_name_index = filename.rfind('_')
filenameList.append([filename[:file_name_index],filename[file_name_index+1:].split('.')[0]])
# list转dict,并比较最大日期
for item in filenameList:
key = item[0]
value = item[1]
if key in result_dict:
if value > result_dict[key]:
result_dict[key] = value
else:
result_dict[key] = value
# 组装为文件名
filenameList = [key + '_' + value + filetype for key,value in result_dict.items()]
return filenameList

if __name__ == '__main__':
# 创建日志对象
FileLog = 'D:\\workspace\\xxxx\\log\\log{}.log'.format(datetime.date.today())
logger.add(FileLog,rotation="500MB", encoding="utf-8", enqueue=True)

today = today()
msg = ''
logger.info('***开始处理***')
# 连接数据库,获取文件数量核对表信息
sqlConnectParams = {"database_name": "xxx", "username": "xxx", "password": "xxx", "host": "xxx", "port": "xxx", "encoding": "cp936"}
conn = connectMSsql(sqlConnectParams)
logger.info('***已连接数据库***')

logger.info('***开始查询文件数量***')
# 处理文件数量检查
fileCheckSqlQuery = "select title,filepath,checknum,datetype,truememo,falsememo,manager,dateadd from xxx where (convert(char(10),getdate(),120)+' '+checktime+':00')>=dateadd(minute,-30,getdate()) and (convert(char(10),getdate(),120)+' '+checktime+':00')<=dateadd(minute,30,getdate())" # 一小时内需要查询的文件行
lista = executeSQL(conn,fileCheckSqlQuery)
logger.info(lista)
if len(lista) > 0:
for i in lista:
logger.info('开始处理:{}'.format(i[0]))
if i[3] == 0:
if is_workday():
logger.info('法定工作日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
filenum = count_files_in_folder(i[1].replace('YYYYMMDD',dateDir))
if filenum >= i[2]:
msg = i[4]
else :
msg = i[5] + '(当前数量为' + str(filenum) + '个)'
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
elif i[3] == 1:
logger.info('自然日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
filenum = count_files_in_folder(i[1].replace('YYYYMMDD',dateDir))
if filenum >= i[2]:
msg = i[4]
else :
msg = i[5] + '(当前数量为' + str(filenum) + '个)'
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
elif i[3] == 3:
if is_tradingday(today):
logger.info('交易日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
filenum = count_files_in_folder(i[1].replace('YYYYMMDD',dateDir))
if filenum >= i[2]:
msg = i[4]
else :
msg = i[5] + '(当前数量为' + str(filenum) + '个)'
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
else:
if not is_workday():
logger.info('法定休息日查询')
if filenum >= i[2]:
msg = i[4]
else :
msg = i[5] + '(当前数量为' + str(filenum) + '个)'
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])

logger.info('***处理结束***')

# 处理文件解压
logger.info('***开始处理文件解压及转存***')
fileUnrarSqlQuery = "select title,rarpath,filepath,datetype,truememo,falsememo,manager from xxxx where (convert(char(10),getdate(),120)+' '+checktime+':00')>=dateadd(minute,-30,getdate()) and (convert(char(10),getdate(),120)+' '+checktime+':00')<=dateadd(minute,30,getdate())" # 一小时内需要查询的文件行
listb = executeSQL(conn,fileUnrarSqlQuery)
logger.info(listb)
if len(listb) >0:
for i in listb:
filename = i[1].replace('YYYYMMDD',today)+'\\'+today+'.RAR'
# 创建文件夹
md(i[2].replace('YYYYMMDD',''),today)
# 判断文件是否存在
if fileIsExists(filename):
# 0 法定工作日;1 自然日;2 法定休息日
if i[3] == 0:
if is_workday():
logger.info('法定节假日处理')
unpack(filename,i[2].replace('YYYYMMDD',today))
msg = i[4]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
elif i[3] == 1:
logger.info('自然日处理')
unpack(filename,i[2].replace('YYYYMMDD',today))
msg = i[4]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
else:
if not is_workday():
logger.info('法定休息日处理')
unpack(filename,i[2].replace('YYYYMMDD',today))
msg = i[4]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
else:
msg = i[5]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])

SendDingTalk(today)
logger.info('处理结束')


# 处理文件下载及更名
logger.info('***开始处理文件下载及更名***')
fileDownloadSqlQuery = "select title,fromuser,filepath,dateadd,filename,datetype,truememo,falsememo,manager,downloadname from xxxx where (convert(char(10),getdate(),120)+' '+checktime+':00')>=dateadd(minute,-30,getdate()) and (convert(char(10),getdate(),120)+' '+checktime+':00')<=dateadd(minute,30,getdate())" # 一小时内需要查询的文件行
listc = executeSQL(conn,fileDownloadSqlQuery)
logger.info(listc)
if len(listc) >0:
loadpath = 'D:\\workspace\\xxxx\\临时文件'
for i in listc:
params = {"mailUrl":"mail.xxxx.cn","account":"xxxx","password":"xxxx","port":"143","fromUser":i[1],"keyword":i[4],"filePath":loadpath}
# 下载邮件中的附件到临时文件夹
logger.info('下载邮件中的附件到临时文件夹')
try:
connectEmail(params)
logger.info('下载完成')
except Exception as e :
logger.info(e)
msg = i[7]
logger.info('下载失败')
InsertDatabase(i[0],today,msg,'0',i[8])
# 日期偏移量处理
logger.info('设置日期偏移量')
newDate = datetime.datetime.today().date() + datetime.timedelta(days = i[3])
dateDir = newDate.strftime('%Y%m%d')
# 创建文件夹
logger.info('创建最终文件夹')
if not fileIsExists(i[2].replace('YYYYMMDD',dateDir)):
newFilePath = md(i[2].replace('YYYYMMDD',''),dateDir)
# 获取目标文件并拷贝到目的路径
logger.info('拷贝文件到最终路径')
filelist = getMaxFilename(loadpath,'.xls')
for j in filelist:
ReameFile(loadpath + '\\' + j,newFilePath,i[9])
# 处理结束,清空文件夹
logger.info('处理结束,清空临时文件夹')
rmd('D:\\workspace\\xxxx\\临时文件\\')
msg = i[6]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[8])
SendDingTalk(today)

logger.info('***开始判断文件是否存在***')
# 处理文件是否存在的判断检查
fileCheckSqlQuery = "select title,filepath,checknum,datetype,truememo,falsememo,manager,dateadd from xxxx where (convert(char(10),getdate(),120)+' '+checktime+':00')>=dateadd(minute,-30,getdate()) and (convert(char(10),getdate(),120)+' '+checktime+':00')<=dateadd(minute,30,getdate())" # 一小时内需要查询的文件行
lista = executeSQL(conn,fileCheckSqlQuery)
logger.info(lista)
if len(lista) > 0:
for i in lista:
logger.info('开始处理:{}'.format(i[0]))
if i[3] == 0:
if is_workday():
logger.info('法定工作日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
pathname = i[1].replace('YYYYMMDD',dateDir)+'\\'+i[2].replace('YYYYMMDD',dateDir)
if fileIsExists(pathname):
msg = i[4]
else :
msg = i[5]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
elif i[3] == 1:
logger.info('自然日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
pathname = i[1].replace('YYYYMMDD',dateDir)+'\\'+i[2].replace('YYYYMMDD',dateDir)
if fileIsExists(pathname):
msg = i[4]
else :
msg = i[5]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
elif i[3] == 3:
if is_tradingday(today):
logger.info('交易日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
pathname = i[1].replace('YYYYMMDD',dateDir)+'\\'+i[2].replace('YYYYMMDD',dateDir)
if fileIsExists(pathname):
msg = i[4]
else :
msg = i[5]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
else:
if not is_workday():
logger.info('法定休息日查询')
newDate = new_date(i[3],datetime.datetime.today().date(),i[7])
dateDir = newDate.strftime('%Y%m%d')
pathname = i[1].replace('YYYYMMDD',dateDir)+'\\'+i[2].replace('YYYYMMDD',dateDir)
if fileIsExists(pathname):
msg = i[4]
else :
msg = i[5]
# 检查结果插入数据库
InsertDatabase(i[0],today,msg,'0',i[6])
logger.info('***处理结束***')

# 发送消息
SendDingTalk(today)
logger.info('***处理结束***')
# 关闭数据库连接
closeConn(conn)





sendMsg

参照《Python|钉钉消息通知对接》。

jicmail

参照《UiAuto|Email插件》。

mssql

参照《UiAuto|SQLServer插件》。


商业转载请联系作者获得授权,非商业转载请注明出处。

支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者

Python|文件自动下载、解压、核对
http://hncd1024.github.io/2023/07/07/Python_fileCheck/
作者
CHEN DI
发布于
2023-07-07
许可协议