python-日志分析步骤

日志分析

一般采集流程

日志产出 -> 采集(Logstash、Flume、Scribe) -> 存储 -> 分析 -> 存储(数据库、NoSQL) -> 可视化

开源实时日志分析ELK平台

Logstash收集日志,并存放到ElasticSearch集群中,Kibana则从ES集群中查询数据生成图表,返回浏览器端

数据提取

数据
非结构化数据

一眼看不出结构的数据。(二进制的,无法用文本理解)

半结构化数据

日志是半结构化数据,是有组织的,有格式的数据。可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据。

结构化数据

数据库内的数据(能够像是行和列一样很好的组织起来)

文本分析

日志是文本文件,需要依赖文件IO、字符串操作、正则表达式等技术。
通过这些技术就能够把日志中需要的数据提取出来。4

这是最常见的日志,nginx、tomcat等WEB Server都会产生这样的日志。 这里面每一段有效的数据对后期的分析都是必须的。

1
2
183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] "GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 
"-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"

思路:如果用空格切割,数据并没有按照业务分割好,比如时间就被分开了,URL相关的也被分开了,User Agent的空格最多,被分割了。 所以,定义的时候不选用这种在filed中出现的字符就可以省很多事,例如使用’\x01’、‘\0x02’这个不可见的ASCII。

1
2
for field in line.split("\x02"):
print(field)

类型转换

fields中的数据是有类型的,例如时间、状态码等。对不同的field要做不同的类型转换,甚至是自定义的转换

时间转换
19/Feb/2013:10:23:29 +0800 对应格式是
%d/%b/%Y:%H:%M:%S %z
使用的函数是datetime类的strptime方法

1
2
3
4
5
6
import datetime

def convert_time(timestr):
return datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z')
#lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'

请求信息的解析

1
2
3
4
5
6
#GET /o2o/media.html?menu=3 HTTP/1.1 
#method url protocol 三部分都非常重要
def get_request(request:str):
return dict(zip(['method','url','protocol'],request.split()))

#lambda request: dict(zip(['method','url','protocol'],request.split()))

映射
对每一个字段命名,然后与值和类型转换的方法对应。解析每一行是有顺序的

映射
对每一个字段命名,然后与值和类型转换的方法对应。解析每一行是有顺序的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import datetime


line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] \
"GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" \
"Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''

CHARS = set(' \'"[]')
print(CHARS)

def makekey(line:str):
start = 0
flag = False
stopchar = ''
for i, c in enumerate(line): # [a]
if c in CHARS:
if c == '[':
flag = True
start = i + 1
if c == ']':
flag = False
if c == '"':
flag = not flag
if flag:
start = i + 1
if flag:
continue
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
if start < len(line):
yield line[start:]
names = ('remote', '', '', 'datetime', 'request', 'status', 'length', '', 'useragent')
ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S
lambda request: dict(zip(['method', 'url', 'protocol'], request.split())),
int, int, None, None
def extract(line:str):
return dict(map(lambda item: (item[0], item[2](item[1]) if item[2] is not None else
item[1]),zip(names, makekey(line), ops)))
print(extract(line))

正则表达式的提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import datetime
import re


line = '''183.60.212.153 - - [19/Feb/2013:10:23:29 +0800] \
"GET /o2o/media.html?menu=3 HTTP/1.1" 200 16691 "-" \
"Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'''

ops = {
'datetime': lambda timestr: datetime.datetime.strptime(timestr, '%d/%b/%Y:%H:%M:%S %z'),
'status': int,
'length': int
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[/\w +:]+)\] \
"(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w/\d.]+)" \
(?P<status>\d+) (?P<length>\d+) .+ "(?P<useragent>.+)"'''

regex = re.compile(pattern)

def extract(line:str) -> dict:
matcher = regex.match(line)
return {k:ops.get(k, lambda x:x)(v) for k,v in matcher.groupdict().items()}

print(extract(line))

异常处理
日志中不免会出现一些不匹配的行,需要处理。

这里使用re.match方法,有可能匹配不上。所以要增加一个判断
采用抛出异常的方式,让调用者获得异常并自行处理。

1
2
3
4
5
6
7
8
def extract(logline:str) -> dict:
"""返回字段的字典,如果返回None说明匹配失败"""
matcher = regex.match(line)
if matcher:
return {k:ops.get(k, lambda x:x)(v) for k,v in matcher.groupdict().items()}
else:
return None # 或输出日志记录

数据载入

1
2
3
4
5
6
7
8
9
def load(path):
"""装载日志文件"""
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue # TODO 解析失败就抛弃,或者打印日志