目录

GO语言开发实践-并发探测局域网内存活 ip

高并发批量探测存活 ip 。

动机

我们需要用 go 语言写一个工具来探测局域网内指定网段的在线主机,并记录成功探测到的 ip ,以供后续程序使用。

方案

  1. 使用开源工具。

  2. 自己用代码实现。

开源工具

开源工具考虑过使用 nmap 。

使用 nmap 以及 go 语言封装过 nmap 的第三方库,都不好用,主要是在高并发的场景下, nmap 要花很久的时间才能扫完整个网段。

代码实现

一开始考虑使用 “github.com/go-ping/ping” 来做 ping 的探测,但是高并发场景下,工具好像有点问题。

于是乎我们打算自己通过 net 包自己实现一个 tcp 探测的工具。

编码

我们的工具是跑在 kubernetes 集群内的,是以 cronjob 的形式运行的,所以,具体的实现思路是这样的:

  1. 配置地区、ip 段、端口、并发限制数这几个关键变量。

  2. 对提供的 ip 段进行 ip 扫描。

  3. 将探测结果序列化后存入 redis ,供消费组使用。

具体逻辑详见代码注释,这里不做过多赘述。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package cmd

import (
    "context"
    "encoding/json"
    "log"
    "net"
    "sync"
    "time"

    "ipscaner/common"
    c "ipscaner/conf"
    "ipscaner/pkg/utils"

    "github.com/pkg/errors"
)

type TcpScaner struct {
    Zone           string
    Subnet         string
    Ports          []string
    Timeout        string
    Limit          int
    ReachableIps   []string
    UnreachableIps []string
    WG             sync.WaitGroup
    Lock           sync.Mutex
}

func NewTcpScaner() Scaner {
    return &TcpScaner{
        Zone:           c.GetConfig().Zone,
        Subnet:         c.GetConfig().Subnet,
        Ports:          c.GetConfig().Ports,
        Limit:          c.GetConfig().Limit,
        ReachableIps:   make([]string, 0),
        UnreachableIps: make([]string, 0),
    }
}

func (ts *TcpScaner) StoreUnreachableIps(ip string) {
    ts.Lock.Lock()
    ts.UnreachableIps = append(ts.UnreachableIps, ip)
    ts.Lock.Unlock()
}

func (ts *TcpScaner) StoreReachableIps(ip string) {
    ts.Lock.Lock()
    ts.ReachableIps = append(ts.ReachableIps, ip)
    ts.Lock.Unlock()
}

func (ts *TcpScaner) Scan() error {
    ips, err := utils.SubNetGet(ts.Subnet)
    if err != nil {
        return errors.Wrap(err, "解析网段失败,请提供类似 192.168.1.0/24 格式的网段。")
    }
    // 利用 channel 限速
    limit := make(chan bool, ts.Limit)

    // 循环取 ip ,再循环对给定的 port 探测
    for _, ip := range ips {
        limit <- true
        ts.WG.Add(1)
        go func(ip string, wg *sync.WaitGroup) {
            // 每个端口都失败,则主机不存在,只要有一个成功就是主机存活的
            errResult := make([]error, 0)
            okResult := make([]bool, 0)
            for _, port := range ts.Ports {
                ok, err := tcpAlive(ip + ":" + port)
                if err != nil {
                    errResult = append(errResult, err)
                }
                okResult = append(okResult, ok)
            }
            if len(errResult) == len(ts.Ports) {
                ts.StoreUnreachableIps(ip)
            }
            for _, o := range okResult {
                if o {
                    ts.StoreReachableIps(ip)
                    continue
                }
            }
            wg.Done()
            <-limit
        }(ip, &ts.WG)
    }
    ts.WG.Wait()

    log.Printf("探测局域网 ip 完成, 机器总共有 %v 台。\n", len(ts.ReachableIps))
    return nil
}

func (ts *TcpScaner) Upload() {
    ctx := context.Background()
    // 拿到 redis 连接
    conn := common.DB
    defer conn.Close()

    // 数据组合,zone 区域标识 + 地区指定的所有 ips
    data := map[string][]string{
        ts.Zone: ts.ReachableIps,
    }

    // 转成 []byte 存入 redis
    bdata, _ := json.Marshal(data)

    err := conn.Publish(ctx, "idc:subnet:ips", bdata).Err()

    if err != nil {
        errors.Wrapf(err, "redis 写入失败: %s", err.Error())
    }
    log.Println("已将机器组信息上传至 redis 服务器。")
}

// 扫描 ip 和传入 redis
func (ts *TcpScaner) ScanAndUpload() {
    err := ts.Scan()
    if err != nil {
        errors.Wrapf(err, "扫描 ip 失败: %s", err.Error())
    }
    ts.Upload()
}

// tcp 探测
func tcpAlive(ip string) (bool, error) {
    tcpTimeOut, err := time.ParseDuration(c.GetConfig().TcpTimeout)
    if err != nil {
        panic(err)
    }
    conn, err := net.DialTimeout("tcp", ip, tcpTimeOut)
    if err != nil {
        return false, errors.Wrapf(err, "%s tcp 探测失败: %s", ip, err.Error())
    }
    conn.Close()
    return true, nil
}

里面比较重要的是 util 库里面的 ip 自增。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package utils

import (
    "net"
)

// 获取子网信息
func SubNetGet(subnet string) ([]string, error){
    var ips []string
    ip, ipNet, err := net.ParseCIDR(subnet)
    if err != nil {
        return ips, err
    }

    // 根据子网网段信息,实现 ip 自增
    for ip := ip.Mask(ipNet.Mask); ipNet.Contains(ip); inc(ip) {
        ips = append(ips, ip.String())
    }
    if len(ips) > 2 {
        ips = ips[1 : len(ips)-1]
    }
    return ips, nil
}

// []byte 加到 255 再加 1 就归零了,利用这个特性来做 ip 自增
// 下面的函数例子:192.168.1.255 + 1 --> 192.168.2.255
func inc(ip net.IP) {
    for j := len(ip) - 1; j >= 0; j-- {
        ip[j]++
        if ip[j] > 0 {
            break
        }
    }
}

测试了一下,并发数量不做限制的话, 192.168.0.0/16 这个大网段仅需 13s 左右就完成扫描并上传到 redis 了。