风控杂谈 –如何使用sql自定义风险IP?

引言

在数字化时代,IP是最常见的网络数据,作为一个地址类型的数据,识别并管理风险IP是维护业务安全与应用安全的关键一环。本文将向风控领域的新手介绍如何使用SQL(结构化查询语言)来识别潜在的风险IP,从而提高我们的业务安全防护能力。

风险IP定义

简单来说,风险IP指的是那些可能进行恶意活动或异常行为的网络地址。这些IP可能涉及到网络攻击、欺诈活动,或是不正常的登录尝试等。通过识别这些IP,我们可以及时采取措施防范潜在的安全风险。

为什么要自定义风险IP?

  1. 不同企业的风险差异性:每个企业的网络环境和面临的安全威胁各不相同。自定义IP可以允许企业结合自身特点,定制不同的安全策略与基线。
  2. 特定业务的风险识别:不同的业务场景会遭遇不同类型的攻击。例如电商平台可能更关注IP是否有欺诈下单,刷单等;金融行业则更多关注账号是否安全,交易是否安全。
  3. 外购产品与业务适配性问题。黑产也分好多家,当你购买了一家的服务,发现大部分IP都是别人家的,黑产很少光顾你们,你就该郁闷了。外购产品因其来源并不明确,只能参考,不能准确依赖,否则引起大量客户投诉,风控人员并没有明确的说明向上交代。
    下面,我们就结合sql语句,用示例来说明一个自建风险IP的做法。

如何使用SQL来识别风险IP?

首先,我们需要一个存储了用户登录信息的数据表,假设名为 tb_user_result。这个表包含了如IP地址、用户ID、登录时间、会话ID和登录是否成功等关键信息。关于建表语句及生成的数据示例,会在下文给出。

问题提出

在当前的业务安全环境中,密码已经不是唯一的验证要素,除了一些验证码(各种滑动,图形等),还有手机短信验证码等。我们关注的特定情景是,当用户尝试使用账号和密码登录且被风控系统识别为潜在风险时,会要求用户进行额外的验证。如果在这一IP地址上用户未能完成或未通过这些验证措施,该IP便被视为风险IP。这要求我们精准地识别并处理这类风险IP,从而降低对业务的影响。

风险IP的一些标准

我们的目标是通过SQL查询来找出那些可能存在风险的IP地址。基于此,我们在利用postgresql定义了以下风险IP的标准:

注意,这些阈值需要跟随业务场景及用户数量进行灵活调整。本文只是随意举例。

  1. 登录验证未通过次数超过20次的IP:这可能意味着有人或程序在尝试破解密码。
  2. 只有失败记录且验证未通过次数超过15次的IP:这类IP可能表示一种持续的攻击尝试。
  3. 通过率低于20%的IP且在该IP上登录人数超过4人:如果一个IP地址大量尝试登录却很少成功,这可能是一个风险信号。
  4. 失败人数超过5个的IP:这意味着多个用户从同一IP地址尝试登录但大多数尝试都失败了。

SQL语句分析与提取

SELECT ip,
       COUNT(DISTINCT userid) AS distinct_user_count,
       COUNT(*) AS total_attempts,
       SUM(CASE WHEN is_success = '0' THEN 1 ELSE 0 END) AS total_failures,
       COUNT(DISTINCT CASE WHEN is_success = '0' THEN userid END) AS distinct_failed_users,
       (SUM(CASE WHEN is_success = '1' THEN 1 ELSE 0 END)::DECIMAL / COUNT(*)) * 100 AS success_rate
FROM tb_user_result
WHERE TO_TIMESTAMP(login_time, 'YYYY-MM-DD HH24:MI:SS') >= NOW() - INTERVAL '7 days'
GROUP BY ip
HAVING (SUM(CASE WHEN is_success = '0' THEN 1 ELSE 0 END) > 20) -- 条件1: 登录验证未通过20次
   OR (SUM(CASE WHEN is_success = '0' THEN 1 ELSE 0 END) > 15 AND SUM(CASE WHEN is_success = '1' THEN 1 ELSE 0 END) = 0) -- 条件2: 只有失败记录且超过15次
   OR ((SUM(CASE WHEN is_success = '1' THEN 1 ELSE 0 END)::DECIMAL / COUNT(*)) * 100 < 20 AND COUNT(DISTINCT userid) > 4) -- 条件3: 通过率低于20%且登录人数大于4人
   OR (COUNT(DISTINCT CASE WHEN is_success = '0' THEN userid END) > 5) -- 条件4: 失败人数大于5个

上述就是一个常见的分析过程。值得注意的是:

  • 阈值的设定,需要贴合业务场景
  • 同时,要注意业务的需求,设定的阈值要尽可能准确,不要有太多的误报

结论

理解和运用SQL来识别风险IP对于风控工作来说是一项宝贵的技能。通过这种方法,我们不仅能够更好地保护我们的业务,还可以深入了解潜在的业务安全威胁。希望这篇文章能帮助风控领域的新手更好地理解如何使用SQL作为强有力的工具来提高网络安全。

附录

建表语句

CREATE TABLE tb_user_result (  
    ip VARCHAR(15),  
    userid VARCHAR(50),  
    login_time VARCHAR(20),  
    sessionid VARCHAR(50),  
    is_success VARCHAR(1)  
);

生成数据的Python3代码

from faker import Faker  
import random  
import psycopg2  

fake = Faker()  
random.seed(42)  # 设置固定的随机种子,这个种子可以自己变

# 生成10个集中性IP和100个分散IP  
central_ips = [fake.ipv4() for _ in range(10)]  
dispersed_ips = [fake.ipv4() for _ in range(100)]  
all_ips = central_ips + dispersed_ips  

# 生成10000条数据  
data = []  
for _ in range(10000):  
    ip = random.choice(all_ips)  
    userid = fake.user_name()  
    login_time = fake.date_time_between(start_date="-30d", end_date="now").strftime("%Y-%m-%d %H:%M:%S")  
    sessionid = fake.uuid4()  

    # 控制is_success生成,以满足特定筛选条件  
    if ip in central_ips:  
        # 集中性IP有更高的失败概率  
        is_success = '0' if random.random() < 0.8 else '1'  
    else:  
        # 分散IP有正常的成功/失败概率  
        is_success = str(random.randint(0, 1))  

    data.append((ip, userid, login_time, sessionid, is_success))  

# 连接到你的数据库  
test_connection_dict = {  
    'host': "10.0.10.5",  
    'database': "test",  
    'user': "postgres",  
    'password': "1",  
    'port': 5432  
}  

connectstr = "host={} dbname={}  user={} password={} port={}".format(test_connection_dict.get('host'),  
                                                                     test_connection_dict.get('database'),  
                                                                     test_connection_dict.get('user'),  
                                                                     test_connection_dict.get('password'),  
                                                                     test_connection_dict.get('port'))  
conn = psycopg2.connect(connectstr)  
cur = conn.cursor()  

# 插入数据  
for record in data:  
    cur.execute(  
        "INSERT INTO tb_user_result (ip, userid, login_time, sessionid, is_success) VALUES (%s, %s, %s, %s, %s)",  
        record  
    )  

conn.commit()  
cur.close()  
conn.close()  

如果想多次演示

  • 更改随机数种子
  • 清空数据库的表,但保留表结构truncate table tb_user_result;
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇