我有一组大约 200,000 个 IP 地址和 10,000 个 (1.1.1.1/24) 形式的子网。对于每个 IP 地址,我需要检查它是否属于这些子网之一,但由于它是一个如此大的数据集,而且我的计算能力较低,我希望对此有一个有效的实现。
在搜索时,我发现的一种方法是这个(https://stackoverflow.com/a/820124/7995937):
from netaddr import IPNetwork, IPAddress
if IPAddress("192.168.0.1") in IPNetwork("192.168.0.0/24"):
print "Yay!"
但是由于我必须循环超过 200,000 个 IP 地址,并且对于每个地址循环超过 10,000 个子网,我不确定这是否有效。 我的第一个疑问是,检查“IPNetwork() 中的 IPAddress()”只是线性扫描还是以某种方式进行了优化?
我想出的另一个解决方案是列出IP子网中包含的所有IP(大约有13,000,000个IP,没有重复),然后对其进行排序。如果我这样做,那么在 200,000 个 IP 地址的循环中,我只需要对更大的 IP 地址集上的每个 IP 进行二分搜索。
for ipMasked in ipsubnets: # Here ipsubnets is the list of all subnets
setUnmaskedIPs = [str(ip) for ip in IPNetwork(ipMasked)]
ip_list = ip_list + setUnmaskedIPs
ip_list = list(set(ip_list)) # To eliminate duplicates
ip_list.sort()
然后我可以通过以下方式执行二分搜索:
for ip in myIPList: # myIPList is the list of 200,000 IPs
if bin_search(ip,ip_list):
print('The ip is present')
这种方法比另一种方法更有效吗?或者还有其他更有效的方法来执行此任务吗?
好吧,所以排序需要 O(nlogn),如果是 13,000,000,你最终会做 O(13000000log(13000000))。然后,您将迭代 200000 个 IP,并在 13000000 个排序列表上进行二分搜索 O(logn)。 我真诚地怀疑这是最好的解决方案。我建议你使用地图
from netaddr import IPNetwork, IPAddress
l_ip_address = map(IPAddress, list_of_ip_address)
l_ip_subnet = map(IPNetwork, list_of_subnets)
if any(x in y for x in l_ip_address for y in l_ip_subnet):
print "FOUND"
这可能不是“最好”的解决方案,但我建议使用集合而不是列表。集合针对检查集合中是否存在任何给定值进行了优化,因此您可以用单个操作替换二分搜索。而不是:
ip_list = list(set(ip_list))
就做:
ip_set = set(ip_list)
然后代码的另一部分变成:
for ip in myIPList: # myIPList is the list of 200,000 IPs
if ip in ip_set:
print('The ip is present')
并且为了提高内存效率,您也可以跳过创建中间列表:
ip_set = set()
for ipMasked in ipsubnets:
ip_set.update([str(ip) for ip in IPNetwork(ipMasked)])
subnets[23]
。继续处理所有子网。
要查看 IP 地址是否在您的子网中,请以与 32 位数字相同的方式对 IP 地址进行编码
ipaddr
,然后(类似于未经测试的代码)
for N in range( 32, 0, -1)
mask = ( 0xffffffff >> (32-N) ) << (32-N)
if (ipaddr & mask) in subnets[N] :
# have found ipaddr in one of our subnets
break # or do whatever...
else
# have not found ipaddr
在集合中查找数字最坏的时间复杂度为 O(log N),其中 N 是集合中元素的数量。对于 IP 地址不在子网集中的最坏情况,此代码最多执行 32 次。如果预计大多数地址都存在,则可以进行优化,首先测试具有最多元素的集合。可能是这样
for N in ( 24, 16, 8, 29, 23, 28, 27, 26, 25, 22, 15, 21 ... )
或者您可以在运行时计算最佳序列。
netaddr
库
正是你所需要的:
IPSet
类。您可以将两个
IPSet
相交,从而得到另一个
IPSet
,您可以迭代以获取相交的所有 IP 地址。您可以使用 intersection
方法或 &
运算符来完成此操作。假设您在 addresses
中有地址,在
networks
中有网络,这就是您想要的:from netaddr import IPSet
intersection = IPSet(addresses) & IPSet(networks)
for address in intersection:
...
这将是相当最佳的。
具有多个随机 IP 地址和网络的 IPython 会话示例(10k 网络,只有 20k 个地址,因为您将看到 20k 已经足够慢),演示结果是正确的并进行性能比较:
In [1]: import random
In [2]: from netaddr import IPAddress, IPNetwork, IPSet
In [3]: addresses = [IPAddress(random.randint(0, 2**32)) for _ in range(20_000)]
In [4]: networks = [IPNetwork('%s/24' % (IPAddress(random.randint(0, 2**24) << 8),)) for _ in range(10_000)]
In [5]: def original():
...: for ip in addresses:
...: for net in networks:
...: if ip in net:
...: yield ip
...:
In [8]: set(original())
Out[8]:
{IPAddress('3.209.127.67'),
IPAddress('12.27.8.169'),
IPAddress('26.148.219.9'),
IPAddress('72.7.76.197'),
IPAddress('109.171.151.200'),
IPAddress('120.220.194.79'),
IPAddress('140.91.183.208'),
IPAddress('154.128.225.217'),
IPAddress('175.136.20.23'),
IPAddress('185.190.93.13'),
IPAddress('186.191.83.2'),
IPAddress('187.94.249.34'),
IPAddress('206.16.20.154'),
IPAddress('224.207.184.13')}
In [19]: set(original()) == set(IPSet(addresses) & IPSet(networks))
Out[19]: True
In [20]: %timeit set(original())
42 s ± 377 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [21]: %timeit set(IPSet(addresses) & IPSet(networks))
108 ms ± 3.5 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)