使用Twisted模拟DNS服务器

问题描述 投票:2回答:1

我正在尝试编写基于Twisted的模拟DNS服务器来进行一些测试。

[从this guide获得灵感,我编写了一个非常简单的服务器,它将所有内容解析为127.0.0.1

from twisted.internet import defer, reactor
from twisted.names import dns, error, server


class MockDNSResolver:

    def _doDynamicResponse(self, query):
        name = query.name.name
        record = dns.Record_A(address=b"127.0.0.1")
        answer = dns.RRHeader(name=name, payload=record)
        authority = []
        additional = []
        return [answer], authority, additional

    def query(self, query, timeout=None):
        print("Incoming query for:", query.name)
        if query.type == dns.A:
            return defer.succeed(self._doDynamicResponse(query))
        else:
            return defer.fail(error.DomainError())


if __name__ == "__main__":
    clients = [MockDNSResolver()]
    factory = server.DNSServerFactory(clients=clients)
    protocol = dns.DNSDatagramProtocol(controller=factory)
    reactor.listenUDP(10053, protocol)
    reactor.listenTCP(10053, factory)
    reactor.run()

以上在dignslookup(从另一个终端)上正常工作:

$ dig -p 10053 @localhost something.example.org A +short
127.0.0.1

$ nslookup something.else.example.org 127.0.0.1 -port=10053
Server:     127.0.0.1
Address:    127.0.0.1#10053

Non-authoritative answer:
Name:   something.else.example.org
Address: 127.0.0.1

我还在运行服务器的终端上获得了相应的匹配:

Incoming query for: something.example.org
Incoming query for: something.else.example.org

然后,我根据this section about making requeststhis section about installing a custom resolver编写了以下代码:

from twisted.internet import reactor
from twisted.names.client import createResolver
from twisted.web.client import Agent


d = Agent(reactor).request(b'GET', b'http://does.not.exist')
reactor.installResolver(createResolver(servers=[('127.0.0.1', 10053)]))


def callback(result):
    print('Result:', result)


d.addBoth(callback)
d.addBoth(lambda _: reactor.stop())

reactor.run()

但是这失败(并且服务器终端中没有任何行)。好像查询不是去模拟服务器,而是去系统定义的服务器:

Result: [Failure instance: Traceback: <class 'twisted.internet.error.DNSLookupError'>: DNS lookup failed: no results for hostname lookup: does.not.exist.
/.../venv/lib/python3.6/site-packages/twisted/internet/_resolver.py:137:deliverResults
/.../venv/lib/python3.6/site-packages/twisted/internet/endpoints.py:921:resolutionComplete
/.../venv/lib/python3.6/site-packages/twisted/internet/defer.py:460:callback
/.../venv/lib/python3.6/site-packages/twisted/internet/defer.py:568:_startRunCallbacks
--- <exception caught here> ---
/.../venv/lib/python3.6/site-packages/twisted/internet/defer.py:654:_runCallbacks
/.../venv/lib/python3.6/site-packages/twisted/internet/endpoints.py:975:startConnectionAttempts
]

我正在使用:

  • macOS 10.14.6 Python 3.6.6,Twisted 18.9.0
  • Linux Mint 19.1,Python 3.6.9,Twisted 19.7.0

感谢您的帮助,如果需要其他信息,请告诉我。

谢谢!

python dns twisted
1个回答
0
投票

这正在工作,我将更新答案以使其更简单。

from twisted.internet import reactor

from twisted.names import client
resolver = client.createResolver(servers=[('127.0.0.1', 10053)])
d = resolver.getHostByName('does.not.exist')

def callback(result):
    print('Result:', result)

d.addBoth(callback)

reactor.callLater(4, reactor.stop)
reactor.run()

我也更改了模拟DNS解析器:

from twisted.internet import defer, reactor
from twisted.names import dns, error, server, common

class MockDNSResolver(common.ResolverBase):
    def _lookup(self, name, cls, type, timeout):
        print("Incoming query for:", name)
        record = dns.Record_A(address=b"127.0.0.5")
        answer = dns.RRHeader(name=name, payload=record)
        return defer.succeed(([answer], [], []))

if __name__ == "__main__":
    clients = [MockDNSResolver()]
    factory = server.DNSServerFactory(clients=clients)
    protocol = dns.DNSDatagramProtocol(controller=factory)
    reactor.listenUDP(10053, protocol)
    reactor.listenTCP(10053, factory)
    reactor.run()
© www.soinside.com 2019 - 2024. All rights reserved.