Python|开放式基金业务数据交换文件的自动解析

近日,我同事有个开放式基金业务数据文件读取的需求,大概背景是:
人民银行制定了开放式基金业务数据交换协议的标准,规定场外开放式基金、证券公司大集合产品相关业务中机构之间进行数据交换时所采用的数据格式、数据定义和数据内容。
在实际使用中,机构会发送 txt 文件,我同事手工通过第三方软件实现数据内容的读取,之后再转为 excel 才能用于其他场景的使用。这次希望可以自动完成 txt 文件的解析(解析后插入到指定的数据库表中)。

分析过程

1、我同事提供了《中央数据交换平台开放式基金业务数据交换协议V2.2》,可以从中看到 txt 文件内容的格式和解析方法。
2、从我同事提供的阅读器 FFReader 中可以看到软件中所定义的数据字典(感谢作者:幻舞奇影-刘德位,下面简单介绍下这款软件)。

FFReader是一个通用的强大的接口数据文件解析阅读编辑工具,轻松支持百万级别的数据文件解析,目前足以支撑互联网行业日交易量,非常方便证券基金从业运维,运营人员,开发人员分析阅读接口数据,排查系统问题。
目前FFReader发布版本已经内置了如下行业公开接口的解析支持,《中央数据交换平台开放式基金业务数据交换协议》、《中国结算开放式基金新版管理人TXT接口规范》、《开放式基金业务数据交换协议0902》、《基金行业数据集中备份接口规范》、《附件1:中央监管平台机构监管综合信息系统-资产管理业务数据报送接口规范(征求意见稿)》、《集合理财电子合同接口规范-中登电子合同》、《特定客户资产管理业务电子签名合同数据接口规范(试行)-43-44》、《基金管理公司及其子公司特定客户资产管理业务电子签名合同数据接口规范(征求意见稿)-31-32》等接口文件的解析配置,且支持解析文件展示字典翻译等详细信息。
FFReader 官方地址:https://www.ffreader.cn/
FFReader 开源地址:https://gitee.com/cnldw/FinanceFileReader

根据上面的数据交换协议和 FFReader 中的数据字典,我通过 python 写了个类,便于后续其他场景的调用。

类方法介绍

  • OFDreader.get_txt_lines() 读取 TXT 文档,返回 gbk 格式的内容。
  • OFDreader.get_ofd_items() 根据 TXT 文档内容,返回匹配的数据字典列表。适用于:开放式基金。
  • OFDreader.cut_string_by_bytes() 提供给 OFDreader.get_dataList() 使用,用于数据文本的截取,不能对字符串进行 str[m:n] 直接截取的原因是数据包含中文,中文占2字节,需进行字节解码为字符的判断。
  • OFDreader.get_dataList() 用于数据行的解析,返回 json 内容,格式为:{ ‘data’:[{‘name’:xx, ‘ename’:xx, ‘type’:xx, ‘length’:xx, ‘precision’:xx, ‘value’:xx},……]}
  • OFDreader.get_title_info() 用于数据头的解析,返回 json 内容,格式为:{‘filecreator’:xx, ‘filereceiver’:xx, ‘filedate’:xx, ‘No’:xx, ‘filetype’:xx, ‘sender’:xx, ‘receiver’:xx }
  • OFDreader.filetype 文件类型,等于 OFDreader.get_ofd_items()[0]
  • OFDreader.iteam_lists 当前文件所对应的数据字典,等于 OFDreader.get_ofd_items()[1]
  • OFDreader.txt_lines txt 文档内容,等于 OFDreader.get_txt_lines()

源码

类文件 OFDreader.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import configparser
import os
import re
import json

class OFDreader:

def __init__ (self,txt_path):
self.txt_path = txt_path
self.current_dit = os.path.dirname(os.path.abspath(__file__))
self.txt_lines = self.get_txt_lines()
self.filetype,self.iteam_lists = self.get_ofd_items()

'''读取TXT文档'''
def get_txt_lines(self):
with open(self.txt_path,'r',encoding='gbk') as f:
self.txt_lines = f.readlines()
return self.txt_lines

'''获取对应的ini文件'''
def get_ofd_items(self):
# 开放式基金
if self.txt_lines[0].strip() == 'OFDCFDAT':
# 获取具体的对应文件
ini_path = os.path.join(os.path.join(self.current_dit,'config'),'OFD_{}.ini'.format(self.txt_lines[1].strip()))
config = configparser.ConfigParser()
config.read(ini_path,encoding='utf-8')
# 获取所有的sections
# sections = config.sections()
# 获取指定的sections
sections_name = '{}_001'.format(self.txt_lines[6].strip())
filetype = config.get(sections_name,'DESCRIBE').strip()
iteam_count = config.get(sections_name,'count').strip()
self.iteam_lists = []
for i in range(1,int(iteam_count)+1):
self.iteam_lists.append(config.get(sections_name,str(i)).strip().split(','))
return filetype,self.iteam_lists

'''数据文本截取'''
def cut_string_by_bytes(self,string, start,end, encoding='gbk'):
# 将字符串编码成字节串
encoded_string = string.encode(encoding)
# 按照字节数截取字节串
partial_bytes = encoded_string[start:end]
# 尝试解码回字符串
try:
# 如果没有截断字符,这将成功
cut_string = partial_bytes.decode(encoding)
except UnicodeDecodeError:
# 如果截断了字符,这将抛出异常
# 在这种情况下,我们移除截断的字节,并尝试重新解码
cut_string = partial_bytes.decode(encoding, errors='ignore')
return cut_string

'''获取数据文本'''
def get_dataList(self):
# 读取TXT数据行数
data_num_index = [index for index,element in enumerate(self.txt_lines) if element.strip() == self.iteam_lists[-1][4]]
data_num = int(self.txt_lines[data_num_index[-1]+1])
# 解析数据
ofd_data = []
for i in range(0,data_num):
new_data = []
index_sum = 0
for j in self.iteam_lists:
value1 = self.cut_string_by_bytes(self.txt_lines[data_num_index[-1]+2+i],index_sum,index_sum+int(j[1]))
if int(j[2])>0:
value1 = f'{(int(value1)/(10**int(j[2]))):.{int(j[2])}f}'
new_data.append(
{
'name':j[3],
'ename':j[4],
'type':j[0],
'length':j[1],
'precision':j[2],
'value':value1.strip()
}
)
index_sum += int(j[1])
ofd_data.append(new_data)
# return {'data':self.ofd_data}
return json.dumps( {'data':ofd_data},ensure_ascii=False)

'''获取表头数据'''
def get_title_info(self):
# filetype = self.get_ofd_items()[0]
config2 = configparser.ConfigParser()
config2.read(os.path.join(self.current_dit,r'config\CSV_基金销售监管报送.ini'),encoding='utf-8')
receiver = config2.get('Dictionary','61').strip()
receiver = re.search(r'{}:([^|]+)'.format(self.txt_lines[8].strip()),receiver).group(1)
title_info = {
'filecreator':self.txt_lines[2].strip(),
'filereceiver':self.txt_lines[3].strip(),
'filedate':self.txt_lines[4].strip(),
'No':self.txt_lines[5].strip(),
'filetype':self.filetype,
'sender':self.txt_lines[7].strip(),
'receiver':receiver
}
return json.dumps(title_info,ensure_ascii=False)

调用文件 main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from OFDread import OFDreader
''' 主要的处理方法,将txt文件解析并插入到数据库中 '''
def txt_to_sql(txt_path,table_name):
try:
logger.info('本次文件:{}'.format(txt_path))
logger.info('开始文件解析')
ofdread = OFDreader(txt_path=txt_path)
logger.info('文件解析完成')
file_code = ofdread.txt_lines[6].strip()
logger.info('文件编码:{}'.format(file_code))
file_data = ofdread.get_dataList()
json_data = json.loads(file_data)
logger.info(file_data)
cursor,connection = connect_oracle(dsn)
logger.info('连接数据库成功')

if file_code == '04': # 交易确认文件
logger.info('处理04类型文件')
for datas in json_data['data']: # 遍历每条数据
logger.info('处理数据行:{}'.format(str(datas)))
columns,values = get_colunm_values(datas)
insertsql = "insert into {table_name} values ({values})".format(table_name=table_name,values=values)
logger.info('sql:insert into {table_name} values ({values})'.format(table_name=table_name,values=values))
cursor.execute(insertsql)
connection.commit()
logger.info('该行处理完毕')
elif file_code == '06': # 分红数据文件
logger.info('处理06类型文件')
for datas in json_data['data']: # 遍历每条数据
logger.info('处理数据行:{}'.format(str(datas)))
columns,values = get_colunm_values(datas)
insertsql = "insert into {table_name} values ({values})".format(table_name=table_name,values=values)
logger.info('sql:insert into {table_name} values ({values})'.format(table_name=table_name,values=values))
cursor.execute(insertsql)
connection.commit()
logger.info('该行处理完毕')
else :
logger.info('error:不支持的数据类型,数据编码为 {}'.format(file_code))
cursor.close()
connection.close()
logger.info('本次处理完毕,关闭数据库连接')
return True
except Exception as e:
logger.error(e)
return False

if __name__ == '__main__':
# 根据实际情况调用,其他代码省略
txt_path = r'xx'
table_name = r'xx'
txt_to_sql(txt_path,table_name):

商业转载请联系作者获得授权,非商业转载请注明出处。

支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者

Python|开放式基金业务数据交换文件的自动解析
http://hncd1024.github.io/2024/02/07/python_OFDread/
作者
CHEN DI
发布于
2024-02-07
许可协议