记录我是如何把rsyslog做成docker镜像,获取nginx的accesslog并且转发到python的
关键点1 nginx日志配置
nginx日志要设置成json格式输出,nginx.conf如下图所示,这个可以在docker镜像中通过volume把nginx.conf挂载进去,然后把/var/log/nginx/access.log挂载到本地
user root;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
log_format main '{"time_local": "$time_local", '
'"path": "$request_uri", '
'"ip": "$remote_addr", '
'"time": "$time_iso8601", '
'"user_agent": "$http_user_agent", '
'"user_id_got": "$uid_got", '
'"user_id_set": "$uid_set", '
'"remote_user": "$remote_user", '
'"request": "$request", '
'"status": "$status", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"http_referrer": "$http_referer" }';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d/*.conf;
}
关键点2 rsyslog配置与Dockerfile
编写一个51-nginx-forward.conf文件放置在/etc/rsyslog.d/下即可
module(load="imfile")
input(type="imfile"
File="/var/log/nginx/access.log"
Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:') then {
action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp"
action.resumeRetryCount="100"
queue.type="linkedList" queue.size="10000")
}
我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载
FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog
关键点3 python程序通过tcp的方式读取rsyslog
python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句
import asyncio
import json
import time
import database_init
class LogAnalyser:
def __init__(self):
pass
def process(self, str_input):
# print(str_input)
str_input = str_input.decode("utf-8", errors="ignore")
# Add your processing steps here
# ...
try:
# Extract created_at from the log string
str_splits = str_input.split("{", 1)
json_text = "{" + str_splits[1]
data = json.loads(json_text)
created_at = data["time"]
request_all = data["request"].split(" /", 1)
http_type = request_all[0]
path = data["path"]
request_time = data["request_time"]
if PREFIX in data["path"]:
path = data["path"]
return http_type, path, created_at,request_time # The order is relevant for INSERT query params
except Exception as e:
print("error in read_rsylog.py,Class LogAnalyser,function process")
print(e)
return None
@asyncio.coroutine
def handle_echo(reader, writer):
log_filter = LogAnalyser()
while True:
line = yield from reader.readline()
if not line:
break
params = log_filter.process(line)
if params:
# 进行一堆操作,例如进行数据库的插入
# execute_sql(params=params)
if __name__ == '__main__':
CURSOR = database_init.DBConnect().CURSOR
CONN = database_init.DBConnect().CONN
PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
database_init.DBConnect().create_table()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
print("Closing the server.")
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
CURSOR.close()
CONN.close()