About

这个问题来源于中兴举办的2020优招算法比赛“傅里叶派”,要求是对提供的数据进行二分图建模,然后找出4,6,8,10,12,14的无向图的总数量,同时有时间和禁止多线程操作要求。起先,我是把数据当成随机生成的普通数据,然后利用DFS(B站上有个up主讲的DFS和BFS系列非常浅显易懂,链接)和拼接单链组合成环,最后利用python的set哈希去重的方法来实现的,在一阶段数据上((256+640,256+640)大小的邻接矩阵)大概可以进1分钟至1分半钟。后面,随着第二阶段数据的增大((1344+1344,1344+1344)大小的邻接矩阵),运行时间急剧增加,大概到了30分钟。实际上,算法的主要瓶颈在去重时间上(去重数据达到亿级别的字符串),为了降低去重时间,我花了不少时间去Google,包括数据库的一些去重,或者使用bitmap位图等,考虑到硬件限制,试了一下bloom filter,但是由于hash碰撞等原因,可能出现一些误判,而且随着查重数量的增大,占用的内存也会增大。最后的测试时间上与set相比没有什么优势。

最后跟朋友交流,发现数据的一些规律得以解决。原来主办方给的数据是精心设计过的,整个数据中存在固定长度的固定数量sections,每个section中若AiBj=1A_{i}B_{j}=1,那么Ai+1Bj+1=1A_{i+1}B_{j+1}=1,存在这么一种递推关系,这样就会大大减少了搜索量和查重量。根据固定的节点修改代码,最后运行时间从半个小时降到了50s左右。

最后的数据规律是快要接近deadline知道的,虽然分析数据也应当是一个基本的素养,但是面对这种题目,我个人感觉还是追求通用性解决方案更有意义一些。比如真正面对庞大的数据量的时候,关系图是不确定的,没有这种很强的先验知识,那么应该设计系统去尽可能快速地查找和统计环的数量。我自己没有进一步深究了,目前的几个naive想法是可以考虑多线程找环查重在一起合并查重,查重获取可以采用一些数据库的技术,比如针对业务建一个服务器专门用来进行通用性查重,代码中直接调API即可。

具体题目要求,两阶段数据和代码见github仓库

算法设计

在阶段一中,没有对数据分布进行分析,而是单纯以普通的二分图找环问题进行算法设计

首先读入csv文件数据,将A, B之间关系采用邻接表的形式分别存储,并赋给A, B中每个人以独有的id(A的id为0-A总人数减一,B的id为0加A总人数-B总人数减一加A总人数),然后利用深度优先搜索思想,按id从小到大的顺利,以每个A为起点,不断递归往下深入图,找出符合长度的环,并将其存储,遍历去重,得到属于每个A的环结果。由于每个A都是往下搜索下一个A,所以搜出的环不会与前面的结果重复,最后直接相加每个A搜索出的结果即为最后的答案。

由于图的尺寸比较大,纯搜索环(以本题为例,尤其环的长度超过8以后搜索时间会迅速增加)和遍历去重比较耗时,为此需要进行优化。

a). 由于是搜索二分图中的无向环,因此环的长度只能是偶数,为了减少整体搜索时间,将10,12,14长度的环分别切成两条长度相等的单链(长度分别为6,7,8),仅头和尾节点相同的单链可以组合成以上述长度的环,4,6,8长度的环可以在搜索单链时分别反向回溯下一节点得到环的数量;

b). 遍历去重问题是个人算法设计中最为耗时的一部分,实验发现如果仅按照a)中的思想搜索环而不去重,可以较快得到答案。一种直接的去重方法是以list存放环的组成id,并升序排好嵌入另一个大list中,之后拼出的环在其中遍历比较,不相等即计数加一并将其append进大list中,但是随着环长度的增加,遍历比较的时间爆炸增长,几乎无法很好完成去重任务。因此后续考虑将已升好序的环的id转成字符串进行编码存入set()中,通过内部hash去重,来极大降低去重开销;

在阶段二中,若直接采用阶段一的思路来处理阶段二的数据,程序运行时间会从95s左右增加到1600s左右,时间复杂度太高。对这种普适性找环算法在题目限制下优化几乎不太可能,于是将目光转移到数据上。通过分析发现,整体数据可以分成好几个sections,在每个sections内部,A点的朋友关系是递增的,即若AiBjA_{i}B_{j}是朋友,那么Ai+1Bj+1A_{i+1}B_{j+1}也是朋友,这种相似性结构会导致sections内部每个点A的环组成也是相似的,且数量相同,类似于动态规划中的重叠子问题。因此,只需要计算每个sections内中开头点的环数量,然后乘以该section的长度,最后相加所有的sections即可。另外,由于是无向环,因此最后还需要将累加得出的每个固定长度环的数量除以A的节点数,去除重复搜索的影响。

由于两阶段的数据都存在这样的规律,因此可以大大简化搜索算法,只需指定不同section的开头节点和长度便可将其合并

代码

import csv
import time
from collections import deque

"""
数据之间存在规律:如果A_{i}B_{j}之间是朋友关系,那么A_{i+1}B_{j+1}也是朋友关系
每行数据之间和上一行存在递归关系,这些关系存在于每份数据固定长度的段落中,
因此可以简化搜索点,每个段落只需搜索起始点即可,后面的环数量都相同

1.通过深度优先搜索DFS在A和B之间反复往下找,直到固定长度或者无法深入为止;
2.由于环的长度越大纯搜索就越耗时,所以利用一半长度加1的单链然后组合拼接;
3.拼接过后的无向环存在重复,利用字符串编码环信息和集合set()加速遍历去重;

"""

class Solutions():

def __init__(self, input_file, output_file):
self.input_file = input_file #输入的csv文件名
self.output_file = output_file #输出的文件名
self.A_num = 0 #存储A部落人数
self.B_num = 0 # 存储B部落人数
self.graphAB = {} # 存储A与B的好友关系
#存储长度为6,7,8的单链,后续组合元素到10,12,14的环
self.lines = {6:deque([]), 7:deque([]), 8:deque([])}
#存储搜索到的长度为4,6,8, 10, 12, 14的环
self.circles = {4:set(), 6:set(), 8:set(), 10:set(), 12:set(), 14:set()}
self.count = {4:0, 6:0, 8:0, 10:0, 12:0, 14:0} #存储最终答案
self.sections_len = 0 # 存储给定数据中设定的固定段落数长度

# 读取CSV文件创建字典表示关系图
def load(self):
with open(self.input_file, 'r') as f_in:
lines = list(csv.reader(f_in))
self.A_num = len(lines) # A部落人数
self.B_num = len(lines[0]) # B部落人数
for m in range(self.A_num + self.B_num):
self.graphAB[m] = deque([])
for i in range(self.A_num):
for j in range(self.B_num):
if lines[i][j] == '1':
# A的id:0 ~ self.A_num - 1
# B的id:self.A_num ~ self.A_num + self.B_num - 1
self.graphAB[i].append(j + self.A_num)
self.graphAB[j + self.A_num].append(i)

# 深度优先遍历搜索小于等于8的路径
def DFS(self, s,A_head,path):
"""
根据好友关系搜索环,直到深度小于8或没有为止
并根据对应关系存入4,6,8的环和6,7,8的单链
s: 搜索起始点
A_head: 最开始的起点A
path: 搜索出的路径
"""
# 计算路径的长度
path_len = len(path)

# 4,6,8的路径的下一个朋友是起点,成4,6,8的环
if (path_len > 2) and (A_head in self.graphAB[s]) :
path_ = path[1:] #深拷贝当前path,丢掉开头
path_.sort() #升序排列
# 进行字符串编码
path_s = ''.join(p for p in map(str, path_))
# 利用set()和内置hash函数去重
if path_s not in self.circles[path_len]:
self.circles[path_len].add(path_s)
self.count[path_len] += 1

# 6,7,8的路径塞进存储中,后续组合配对成环
if (path_len > 5) and (path_len < 9):
self.lines[path_len].append(path[1:])

# 深度优先搜索,长度只搜到8
if path_len < 8:
for f_next in self.graphAB[s]:
# 如果一个一个搜,可以这样写,每个环只往下搜避免重复
# if (f_next not in path) and (f_next > A_head):
if (f_next not in path):
path.append(f_next)
self.DFS(f_next, A_head, path)
path.pop()


#将找到固定长度的单链两两组合成环并去重
def lines2circles(self):
"""
把以某个A为起点搜索出的固定长度的单链进行比较
找出除起终点相同任意元素都不同的单链两两配对成环
lines: 指定长度的单链集合列表,每个单链开头的元素相同,都是以A中某人为起点
"""
# 依次拼接长度为6,7,8的单链
# 初始化访问标志位,便于确立是否可以拼接两条单链
visited = [0] * (self.A_num + self.B_num)
for k,v in zip(self.lines.keys(),self.lines.values()):
len_lines = len(v) #单链的数量
# 以某个A为起点的固定长度单链可能没有或只有一个,
# 这个时候无法成环,不用继续运行
if (len_lines == 0) or (len_lines == 1):
return
# 开始找环
len_circle = 2 * k - 2
# 将lines按照最后一个元素进行升序,降低循环次数
v = sorted(v,key=lambda k:k[-1])

for t in range(len_lines - 1):
line1 = v[t]
for s in line1:
visited[s] = 1
for k in range(t+1, len_lines):
#确保只在起终点相同的line中组合,第二个没有后面一定没有
if line1[-1] != v[k][-1]:
break
#找出两个中相同的元素
line2 = v[k][:-1]
SAME = 0 # 合并双链判断标志
# 除了开头和末尾还有其他元素相同,无法配对成环
for s in line2:
if visited[s] == 1:
SAME = 1
break
if SAME == 0:
#去掉相同的头和尾元素,成环并升序
circle = line1 + line2
circle.sort()
# 字符串编码环存入set()
circle_ = ''.join(c for c in map(str, circle))
if circle_ not in self.circles[len_circle]:
self.circles[len_circle].add(circle_)
self.count[len_circle] += 1
for s in line1:
visited[s] = 0

# 以每个A为起点搜索所有指定长度的不重复无向环
def search_circles(self):
"""
以每个A为起点找环;
由于数据存在规律,可以简化搜索点;
否则,如果是随机生成朋友关系,
需要一个一个点去搜

"""
# 第一阶段数据各段落开头节点
if self.A_num == 256:
nodes = [0, 64, 128, 192]
self.sections_len = 64
# 第二阶段数据各段落开头节点
elif self.A_num == 1344:
nodes = [0, 192, 384, 576, 768, 960, 1152]
self.sections_len = 192
# 如果数据是随机生成的
else:
nodes = range(self.A_num)
for i in nodes:
path = [i]
#找4,6,8长度环和6,7,8长度单链
self.DFS(i,i,path)
# 拼接10,12,14长度环
self.lines2circles()

# 清空,下一次找环做准备
self.lines = {6:deque([]), 7:deque([]), 8:deque([])}
self.circles = {4:set(), 6:set(), 8:set(), 10:set(), 12:set(), 14:set()}

# 简单打印程序进度
if self.sections_len != 0:
print('progress::%d / %d'%(i+self.sections_len, self.A_num), end='\r')
else:
print('progress::%d / %d'%(i+1, self.A_num), end='\r')

# 写入答案到txt文件
def output_ans(self):
"""
把最终的答案写到result.txt文件中
并打印至终端
"""
with open(self.output_file, 'w') as f_out:
for k,v in zip(self.count.keys(), self.count.values()):
# 每个A节点会被重复遍历,因此需要除以环中包含的A节点数
# 如果是一个一个搜,改一下DFS最后的if代码,此处不再需要
# 这里是为了统一数据规律的情况做了折中处理
v = v * self.sections_len // (k // 2)
f_out.write('木托盘上有%d个名字的祭品最多有%d个;\n'%(k,v))
print('\n', v)


if __name__ == '__main__':

t1 = time.time()
find = Solutions('Example.csv', 'result.txt')
find.load()
find.search_circles()
find.output_ans()
t2 = time.time()
print('Runing Time: %ds'%(t2-t1))


个人线下IDE 采用VS Code,笔记本电脑运行环境为64位 Win10下的Anaconda3 Python 3.7虚拟环境,CPU为2.3GHz的i5-8300H,总内存为16G。针对第一阶段数据,程序用时大约3s左右,针对第二阶段数据,程序用时大约50s左右。

一阶段数据结果

二阶段数据结果

实际上,此份代码是还可以进一步优化的,尤其是去重部分,还可以根据数据做些文章,我自己不是很感兴趣,没再继续下去了。