[docker]通过rsyslog记录日志并转发nginx日志到python程序

[docker]通过rsyslog记录日志并转发nginx日志到python程序

记录我是如何把rsyslog做成docker镜像,获取nginx的accesslog并且转发到python的

关键点1 nginx日志配置

nginx日志要设置成json格式输出,nginx.conf如下图所示,这个可以在docker镜像中通过volume把nginx.conf挂载进去,然后把/var/log/nginx/access.log挂载到本地

user  root;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {
    worker_connections  1024;
}
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';
    log_format main '{"time_local": "$time_local", '
   '"path": "$request_uri", '
   '"ip": "$remote_addr", '
   '"time": "$time_iso8601", '
   '"user_agent": "$http_user_agent", '
   '"user_id_got": "$uid_got", '
   '"user_id_set": "$uid_set", '
   '"remote_user": "$remote_user", '
   '"request": "$request", '
   '"status": "$status", '
   '"body_bytes_sent": "$body_bytes_sent", '
   '"request_time": "$request_time", '
   '"http_referrer": "$http_referer" }';
    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    #tcp_nopush     on;
    keepalive_timeout  65;
    #gzip  on;
    include /etc/nginx/conf.d/*.conf;
}

关键点2 rsyslog配置与Dockerfile

编写一个51-nginx-forward.conf文件放置在/etc/rsyslog.d/下即可

module(load="imfile")
input(type="imfile"
      File="/var/log/nginx/access.log"
      Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:')  then {
  action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp"
            action.resumeRetryCount="100"
            queue.type="linkedList" queue.size="10000")
}

我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载

FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
    rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog

关键点3 python程序通过tcp的方式读取rsyslog

python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句

import asyncio
import json
import time
import database_init
class LogAnalyser:
    def __init__(self):
        pass
    def process(self, str_input):
        # print(str_input)
        str_input = str_input.decode("utf-8", errors="ignore")
        # Add your processing steps here
        # ...
        try:
            # Extract created_at from the log string
            str_splits = str_input.split("{", 1)
            json_text = "{" + str_splits[1]
            data = json.loads(json_text)
            created_at = data["time"]
            request_all = data["request"].split(" /", 1)
            http_type = request_all[0]
            path = data["path"]
            request_time = data["request_time"]
            if PREFIX in data["path"]:
                path = data["path"]
                return http_type, path, created_at,request_time  # The order is relevant for INSERT query params
        except Exception as e:
            print("error in read_rsylog.py,Class LogAnalyser,function process")
            print(e)
        return None
@asyncio.coroutine
def handle_echo(reader, writer):
    log_filter = LogAnalyser()
    while True:
        line = yield from reader.readline()
        if not line:
            break
        params = log_filter.process(line)
        if params:
            # 进行一堆操作,例如进行数据库的插入
            # execute_sql(params=params)
if __name__ == '__main__':
    CURSOR = database_init.DBConnect().CURSOR
    CONN = database_init.DBConnect().CONN
    PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
    database_init.DBConnect().create_table()
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
    server = loop.run_until_complete(coro)
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    # Close the server
    print("Closing the server.")
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    CURSOR.close()
    CONN.close()

苏ICP备18047533号-1