我有 3 个源代码文件(主文件和从中导入的)。
test.py - 只需导入文件
import test_child
test_child.py - 调用导入文件中定义的函数
import test_grandchild
test_grandchild.check_sshd()
test_grandchild.py - 定义函数
import paramiko
def check_sshd():
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print('checking ssh connection to local host')
client.connect('127.0.0.1')
client.close()
print('finished')
check_sshd()
是一个仅用于检查 sshd 是否在本地主机上运行的函数。所以,我希望在 sshd 未运行时发生身份验证异常,否则会出现套接字错误。
当我运行test_child.py时,结果符合我的预期。但是,paramiko 客户端卡在
connect()
上,我必须终止该进程。
$ python test.py
checking ssh connection to local host
^CTraceback (most recent call last):
File "test.py", line 4, in <module>
import test_child
File "/root/tests/test_child.py", line 6, in <module>
test_grandchild.check_sshd()
File "/root/tests/test_grandchild.py", line 10, in check_sshd
client.connect('127.0.0.1')
File "build/bdist.linux-x86_64/egg/paramiko/client.py", line 242, in connect
File "build/bdist.linux-x86_64/egg/paramiko/transport.py", line 342, in start_client
File "/usr/lib64/python2.7/threading.py", line 621, in wait
self.__cond.wait(timeout, balancing)
File "/usr/lib64/python2.7/threading.py", line 361, in wait
_sleep(delay)
KeyboardInterrupt
我该如何解决这个问题?
CentOS 7 上的 Paramiko 1.12.4
谢谢。
Paramiko使用Threading库将任务分配给不同的线程,并设置事件,然后它们相互通信并返回输出。这里的问题在于 test.py 你只是导入 test_child 而不是专门调用它的任何函数。
因此调用 check_sshd() 的执行点是 test_child.py 而不是 test.py。因此未设置线程事件。因此,为此,您要么从同一个文件调用函数,要么需要专门从 test.py 调用函数,而不仅仅是导入文件。
测试.py
import test_child
test_child.check_ssh_connection()
test_child.py
import test_grandchild
def check_ssh_connection():
test_grandchild.check_sshd()
test_grandchild.py
import paramiko
def check_sshd():
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print('checking ssh connection to local host')
client.connect('127.0.0.1')
client.close()
print('finished')
希望这能解决您的问题。