如何用 SQL 统计日志中五分钟内请求数超过 100 的 IP

某客户网站出现爬虫,需要我从一个月的日志中分析出五分钟内请求量超 100 的 IP。

输入文本是 CSV 格式,每个请求占一行:

2021-11-09 13:04:06,223.80.x.x
2021-11-09 13:04:08,223.211.x.x
...
2021-12-09 13:00:01,122.x.x.x

统计类的需求优先考虑用 SQL 来完成,为了方便我直接用 SQLite,首先创建数据库和表:

$ sqlite3 log.db
sqlite> CREATE TABLE log(time text, ip text);

接着将 log.csv 的数据入库:

sqlite> .mode csv
sqlite> .import log.csv log

接着步骤如下:

1、算出每条的请求时间计算出5分钟范围,例如 2021-11-09 13:02:44,5 分钟范围则是 2021-11-09 13:00:00 ~ 2021-11-09 13:05:00;

2、汇总统计时间范围。

完成第一步的重点在于如何计算时间范围,为了方便计算,先将日期字符串转为时间戳:

select time, strftime('%s', time, 'utc') from log

日期范围用取模运算计算,计算公式如下,时间戳以秒为单位,5 分钟即 300 秒:

起点时间:时间戳 - 时间戳 mod 300

终点时间:时间戳 + 时间戳 mod 300

这里我在 shell 中演示:

$ date +%s                      # 当前时间戳
1639378362
$ date -d @1639378362           # 转换为易读格式
2021年 12月 13日 星期一 14:52:42 CST
$ echo '1639378362 - 1639378362 % 300' | bc # 计算起点时间
1639378200
$ date -d @1639378200           # 格式转换
2021年 12月 13日 星期一 14:50:00 CST
$ echo '1639378362 + 1639378362 % 300' | bc # 计算终点时间
1639378524
$ date -d @1639378524           # 格式转换
2021年 12月 13日 星期一 14:55:24 CST

把这个思路转换为 SQL,“||”是字符串连接符,为了易读:

select (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip from log;

下一步做一个 group by,将相同时间范围类的统计出来:

select (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip, count(1) from log group by range, ip;

为了易读,做一次嵌套 SQL,把日期格式化下:

select
  datetime(substr(range, 0, 11), 'unixepoch', 'localtime') || '~' || datetime(substr(range, 12), 'unixepoch', 'localtime'), ip, total
from(select
        (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300) as range, ip, count(1) as total
     from log
     group by range,ip)
where total >= 100;

查询结果:

"2021-11-30 14:45:00~2021-11-30 14:50:00",183.x.x.161,100
"2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.161,175
"2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.166,132
"2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.171,112
"2021-11-30 14:50:00~2021-11-30 14:55:00",183.x.x.174,114
"2021-11-30 14:55:00~2021-11-30 15:00:00",183.x.x.161,177
...

验证无误后,将结果以 CSV 格式输出到文件中:

sqlite> .output result.csv
sqlite>   select
   ...>     datetime(substr(range, 0, 11), 'unixepoch', 'localtime') || '~' || datetime(substr(range, 12), 'unixepoch', 'localtime'), ip, total
   ...>   from(select
   ...>           (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300) || '~' || (strftime('%s', time, 'utc') - strftime('%s', time, 'utc') % 300 + 300)
as range, ip, count(1) as total
   ...>        from log
   ...>        group by range,ip)
   ...>   where total >= 100;
sqlite>