日志分析
一般采集流程
日志产出 -> 采集(Logstash、Flume、Scribe) -> 存储 -> 分析 -> 存储(数据库、NoSQL) -> 可视化
开源实时日志分析ELK平台
Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端
数据提取
数据
非结构化数据
一眼看不出结构的数据。(二进制的,无法用文本理解)
半结构化数据
日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。
结构化数据
数据库内的数据(能够像是行和列一样很好的组织起来)
文本分析
日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。
通过这些技术就能够把日志中需要的数据提取出来。4
这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志。 这里面每一段有效的数据对后期的分析都是必须的。
1 2
| 183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"
|
思路:如果用空格切割,数据并没有按照业务分割好,比如时间就被分开了,URL相关的也被分开了,User Agent的空格最多,被分割了。 所以,定义的时候不选用这种在filed中出现的字符就可以省很多事,例如使用’\x01’、‘\0x02’这个不可见的ASCII。
1 2
| for field in line.split("\x02"): print(field)
|
类型转换
fields中的数据是有类型的,例如时间、状态码等。对不同的field要做不同的类型转换,甚至是自定义的转换
时间转换
19/Feb/2013:10:23:29 +0800 对应格式是
%d/%b/%Y:%H:%M:%S %z
使用的函数是datetime类的strptime方法
1 2 3 4 5 6
| import datetime def convert_time(timestr): return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')
|
请求信息的解析
1 2 3 4 5 6
|
def get_request(request:str): return dict(zip(['method','url','protocol'],request.split()))
|
映射
对每一个字段命名,然后与值和类型转换的方法对应。解析每一行是有顺序的
映射
对每一个字段命名,然后与值和类型转换的方法对应。解析每一行是有顺序的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import datetime line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] \ "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" \ "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"''' CHARS = set(' \'"[]') print(CHARS) def makekey(line:str): start = 0 flag = False stopchar = '' for i, c in enumerate(line): if c in CHARS: if c == '[': flag = True start = i + 1 if c == ']': flag = False if c == '"': flag = not flag if flag: start = i + 1 if flag: continue if start == i: start = i + 1 continue yield line[start:i] start = i + 1 if start < len(line): yield line[start:] names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent') ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S lambda request: dict(zip(['method', 'url', 'protocol'], request.split())), int, int, None, None def extract(line:str): return dict(map(lambda item: (item[0], item[2](item[1]) if item[2] is not None else item[1]),zip(names, makekey(line), ops))) print(extract(line))
|
正则表达式的提取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import datetime import re line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] \ "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" \ "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"''' ops = { 'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'length': int } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] \ "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" \ (?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) def extract(line:str) -> dict: matcher = regex.match(line) return {k:ops.get(k, lambda x:x)(v) for k,v in matcher.groupdict().items()} print(extract(line))
|
异常处理
日志中不免会出现一些不匹配的行,需要处理。
这里使用re.match方法,有可能匹配不上。所以要增加一个判断
采用抛出异常的方式,让调用者获得异常并自行处理。
1 2 3 4 5 6 7 8
| def extract(logline:str) -> dict: """返回字段的字典,如果返回None说明匹配失败""" matcher = regex.match(line) if matcher: return {k:ops.get(k, lambda x:x)(v) for k,v in matcher.groupdict().items()} else: return None
|
数据载入
1 2 3 4 5 6 7 8 9
| def load(path): """装载日志文件""" with open(path) as f: for line in f: fields = extract(line) if fields: yield fields else: continue
|