1.需求
在数据清洗(ETL),日志文件分析,分隔符信息提取时,我们都会遇到如下常见的文本数据:
- 中楼层/14层,东西,西直门南大街 3号院,1985年建,板楼
- 中楼层/23层,南北,通惠南路6号,2003年建,板楼
- 中楼层/12层,南北,通惠南路6号 1号院,2003年建,塔楼-
一个常见的处理思路,是按照分隔符,对文本进行切割。对于上面的文本,可以采用/,两种符号来切割。变成如下的表格样式,之后进行数据处理便非常容易了。
所以我开玩笑的说,一门语言中split函数可能是用的最多的。在文本处理中会遇到大量的这种需求,但数据格式多变,总会有大量精力耗费在这类重复的工作上。于是很自然的会期待是否有一种自动算法,能帮助我们自动分割字符串?
假设已经为我们提供了一批文本,构成一个字符串数组。我们的任务分为两步:编译(发现内部的分隔符和模式)
def Compile(self,datas):
- 分隔符
- 相同的文本串(如上面的'楼层')
- 不同的文本串(如'14','12')
分割(根据发现的模式分割)
def Split(self,text,splitgroup,isSameOut=True):
2.一种简单的思路
从直觉上来说,逗号,空格,分号,冒号这类符号是最有可能出现的分隔符。一种朴素的想法是:
列出可能是分隔符的所有符号,绝大多数非字母的ascii码,都可以列入统计所有符号在每一行中出现的次数,构成一个数组字典,例如上面的例子python ',' : [3,3,3] '/' : [1,1,1] ' ' : [1,0,1]
求取每个数组的方差,如果满足小于特定的阈值,则可认为是一个分隔符
我们不能严格的认定,只有数组的元素全部一样才是分隔符:因为总会出现特殊情况,应当允许特殊情况的发生。方差的阈值,应通过参数传入。我一般将其定为0.1
将所有满足方差小于阈值的分隔符提取出来。对上面的例子,分隔符应该是 ,和/对样例数据进行预分割,分割之后,我们会发现:
标注相同的,是指该列所有的数据都是同一内容,否则为不一样内容。
最后,可以看出,该数据集可以按照斜杠和逗号进行分割。分割的第一项是相同项,可以选择不输出。
这种方法思路很简单,但是非常适合由计算机生成的网络数据,这些数据通常都有明确的格式,分隔符固定,因此速度较快,而且性能卓越。全部代码如下:
import re;from asq.initiators import querydef GetVariance(data): sum1 = 0.0 sum2 = 0.0 l = len(data); for i in range(l): sum1 += data[i] sum2 += data[i] ** 2 mean = sum1 / l var = sum2 / l - mean ** 2 return var;def GetMaxSameCount(datas): dic = {}; for t in datas: if t in dic: dic[t] += 1; else: dic[t] = 1; if len(dic) == 0: return 0; maxkey, maxvalue = None, -1; for key in dic: if dic[key] > maxvalue: maxvalue = dic[key]; maxkey = key; return (maxkey, maxvalue);class SplitType: (ENTITY, SPLIT, SAMECONTENT, DIFFCONTENT) = range(4)class SplitItem(object): def __init__(self): self.SplitType = None; self.Name = None; self.Value = None self.Index = 0; self.IsRepeat = False;class SplitGroup(object): def __init__(self): self.SplitChars = {}; # dict,key:char, value:charmaxcount self.SplitItems = [];class Spliter(object): def __init__(self): self.MatchRatio = 0.8 self.ModeCheckRatio = 0.3; self.MaxVariance = 3; self.spliter2 = u' \r\n\t./_"\',;():|[]{}。:;' self.spliter3 = re.compile(r'[a-zA-Z0-9\u4e00-\u9fa5\u3040-\u309f\u30a0-\u30ff]') self.spliterdict = [self.spliter2, self.spliter3]; def GetCharCount(self, string, char): count = 0; for c in string: if c == char: count += 1; return count; def Compile(self, datas): splititems = []; splitchars = []; maps = {}; datalen = len(datas); for data in datas: if data == None or data == '': continue; for splitchar in self.spliter2: charcount = self.GetCharCount(data, splitchar) if charcount == 0: continue; count = maps.get(splitchar, None); if count == None: maps[splitchar] = [charcount]; else: maps[splitchar].append(charcount); # select real splitchars for text in maps: map = maps[text]; if len(map) < datalen / 2: continue charcount = GetVariance(map); maxkey, maxvalue = GetMaxSameCount(map); if charcount < self.MaxVariance: splitchars.append(text) splitGroup = SplitGroup(); results = []; modedict = []; for data in datas: splitResult = self.Split(data, splitchars); results.append(splitResult); qresults = query(results); maxlen = qresults.max(lambda x: len(x)); samevalues = []; for i in range(0, maxlen): splititem = SplitItem(); splititem.Index = i; values = []; for splitResult in results: if i < len(splitResult): if splititem.SplitType == None and splitResult[i] in splitchars: splititem.SplitType = SplitType.SPLIT; splititem.Value = splitResult[i]; values.append(splitResult[i]); if splititem.SplitType == None: text, value = GetMaxSameCount(values) if value > len(values) * self.MatchRatio: splititem.SplitType = SplitType.SAMECONTENT; splititem.Value = text; if text in samevalues: splititem.IsRepeat = True; else: samevalues.append(text); else: splititem.SplitType = SplitType.DIFFCONTENT; splititems.append(splititem) splitGroup.SplitChars = splitchars; splitGroup.SplitItems = splititems; # post process return splitGroup; def SplitWithGroup(self, text, splitgroup, isSameOut=True, issplitOut=False): results = self.Split(text, splitgroup.SplitChars); splitIndex = 0; for r in results: currp = splitgroup.SplitItems[splitIndex]; if r in splitgroup.SplitChars: while splitgroup.SplitItems[splitIndex].Value != r: splitIndex += 1; if splitIndex == len(splitgroup.SplitItems): return; if issplitOut == False: splitIndex += 1; continue; splitIndex += 1; if currp.SplitType == SplitType.SAMECONTENT: if isSameOut == False: continue; yield r; def Split(self, data, splits): # 连续的分隔符会被合并? if data is None: return None; if len(splits) == 0: return [data]; last = -1; splititems = []; l = len(data); for i in range(0, l): r = data[i]; if r not in splits: continue; else: if i > 0 and i > last + 1: splititems.append(data[last + 1:i]); splititems.append(r); last = i if last + 1 < len(data): splititems.append(data[last + 1:]); return splititems;if __name__ == '__main__': sp = Spliter(); spgroups = sp.Compile(['中楼层/14层,东西,西直门南大街 3号院,1985年建,板楼' , '中楼层/23层,南北,通惠南路6号,2003年建,板楼', '中楼层/12层,南北,通惠南路6号 1号院,2003年建,塔楼']) for r in sp.SplitWithGroup(u"低楼层/14层,东西,太阳宫中路太阳宫大厦,2003年建,板楼", spgroups): print(r)
sp是分割器实例,对文本数组编译后获得了spgroups,这个数据结构存储了分割所需的信息。之后使用SplitWithGroup方法,即可对文本进行分割,返回的是一个生成器。该函数的一系列参数可以指定是否输出相同项:
3. 其他可能的方法
文本对齐算法
上文算法虽然简单,但却不能处理所有的情况。以下情况就很难处理:
- 样本数量非常少- 分隔符数量不一致- 上文方法可能会带来严重的错误。采用对齐的思路进行分析。对齐的意思是很容易理解的,但如何让计算机认为是对齐呢?这就依赖于字符串的编辑距离。对齐算法有大量可供参考的源代码和论文,由于不是作者亲自实现的,因此就不多做介绍。这种算法使用了动态规划技术,相对简单,但内存占用量是巨大的,如果字符串长度很长,那么将会开辟很大的空间进行计算。另外,它是两两计算的,如果给出的是一组数据,如何合并最后结果也是比较复杂的问题。
4.歧义性
分割不一定是好的,这要看所考虑的问题的环境。例如12:03是一个时间,如果直接按照冒号分割,之后的算法就再也无法判断是不是时间了。分割造成了原有语境的丢失。
解决这个问题的思路有两种:软分割
与其纠结于要不要分割,倒不如换一种角度考虑问题。树结构给了很好地启发,在一个层级上看,它们是一个字段,但看得更深一些,则是两个字段。因此,12:03被划分为两个子节点12和03,但从父节点上来看,却依旧是12:03。
树结构比列表带来了更强大的表现能力,具体可参考《》一文。自动确定层级关系
考虑下面的数据:
LGA,MBA,12:03,15:04,165MBA,LGA,18:30,22:40,174可以认定,分隔符有:和, 但哪个符号的优先级更高呢?我们当然知道是冒号,但计算机如何知道?一种基本可行的方案,是确定分割后,不同子项的模式是否相似。如果优先按逗号分割,则有\w+,一个\d+,两个dd:dd的模式但如优先按冒号,则会变成 \w+,\w+,\d{2} 以及\d{2} \d{2},\d{3}显然,按照逗号分割,子项的模式相似度更高。优先使用逗号分隔。这样,就能转换为一颗树。与之而来的问题,是如何确定不同子项的模式相似度更高?
我对此的定义是,不同子项内部的分隔符的数量和排布相似性更高。使用欧拉距离即可计算。不过,这种基于相似性的策略,总有可能出错的情况。5. 总结
经过笔者近半年的试用,这种算法在处理网页数据时非常方便,能够轻松地对不同文本进行切分。当然算法比较简单,用的也是最朴素的“方差”来评估分隔符的可能性。如果有更好的办法,欢迎留言。