我目前在后端使用 5.2.1 版本的 xterm.js 和 django。我想检测文本编辑器何时在终端中运行并记录它。我目前的方法是使用 nano、vim 等关键字对其进行过滤。唯一的问题是用户可能使用别名,所以我想不断在后台运行“alias”命令并检查是否有 nano 或vim 并将它们也添加到黑名单中。 “alias”命令必须在从 connect 函数运行的 bash 进程中运行,否则它将是另一个终端实例。此外,用户不能在浏览器中看到该命令,因此不能选择使用 os.write() 。我怎样才能以最简单的方式实现这一目标?
index.html:
<script type="module">
var socket = io.connect({ transports: ["websocket", "polling"] });
const status = document.getElementById("status")
const button = document.getElementById("button")
const fit = new FitAddon.FitAddon();
var term = new Terminal({
cursorBlink: true,
});
term.loadAddon(fit);
term.open(document.getElementById('terminal'));
fit.fit();
var terminal_line = '';
term.onKey(e => {
if (e.key == "\r") {
terminal_line = term.buffer.active.getLine(term._core.buffer.y)?.translateToString();
console.log("terminal line: ", terminal_line);
socket.emit("log_input", { "user_input": terminal_line });
}
socket.emit("pty_input", { "input": e.key });
})
socket.on("pty_output", function (output) {
console.log("output: ", output["output"]);
term.write(output["output"]);
})
socket.on("connect", () => {
status.innerHTML = '<span style="background-color: lightgreen;">connected</span>'
button.innerHTML = 'Disconnect'
})
socket.on("disconnect", () => {
status.innerHTML = '<span style="background-color: #ff8383;">disconnected</span>'
button.innerHTML = 'Connect'
})
function myFunction() {
if (button.innerHTML == 'Connect') {
location.reload();
}
else if (button.innerHTML == "Disconnect") {
socket.emit("disconnect_request")
}
}
function resize() {
console.log("resized")
fit.fit()
socket.emit("resize", { "cols": term.cols, "rows": term.rows })
}
window.onresize = resize
window.onload = resize
</script>
views.py:
import os
from django.shortcuts import render
import socketio
import pty
import select
import subprocess
import struct
import fcntl
import termios
import signal
import time
async_mode = "eventlet"
sio = socketio.Server(async_mode=async_mode)
fd = None
child_pid = None
def index(request):
return render(request, "index.html")
def set_winsize(fd, row, col, xpix=0, ypix=0):
winsize = struct.pack("HHHH", row, col, xpix, ypix)
fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
def read_and_forward_pty_output():
global fd
max_read_bytes = 1024 * 20
while True:
sio.sleep(0.01)
if fd:
timeout_sec = 0
(data_ready, _, _) = select.select([fd], [], [], timeout_sec)
if data_ready:
output = os.read(fd, max_read_bytes).decode()
sio.emit("pty_output", {"output": output})
else:
print("process killed")
return
@sio.event
def resize(sid, message):
print("entered resize")
if fd:
set_winsize(fd, message["rows"], message["cols"])
@sio.event
def pty_input(sid, message):
if fd:
os.write(fd, message["input"].encode())
@sio.event
def log_input(sid, user_input):
with open("userinput.log", "a") as f:
f.write(user_input["user_input"]+"\n")
def extract_path(raw_input):
if "/" not in raw_input:
filename = raw_input.split()[-1]
else:
filename = raw_input.split()[-1][raw_input.split()[-1].rfind("/")+1:]
path = os.getcwd()+"/"+filename
print("Filename: ",filename)
print(os.path.abspath(path))
@sio.event
def disconnect_request(sid):
sio.disconnect(sid)
@sio.event
def connect(sid, environ):
global fd
global child_pid
if child_pid:
os.write(fd, "\n".encode())
return
(child_pid, fd) = pty.fork()
if child_pid == 0:
subprocess.run("bash")
subprocess.run("clear")
else:
sio.start_background_task(target=read_and_forward_pty_output)
@sio.event
def disconnect(sid):
global fd
global child_pid
os.kill(child_pid, signal.SIGKILL)
os.wait()
fd = None
child_pid = None
print("Client disconnected")
所以我在这里修改了你的代码。尝试以下操作:
创建一个函数来检查别名: 编写一个名为 check_aliases 的函数,它将发送别名命令到终端,读取输出,并查找 nano 或 vim 的别名。
更新 pty_input 事件处理程序: 更新 pty_input 事件处理程序以检查输入的命令是否在黑名单别名列表中,如果找到则记录它。
定期运行 check_aliases 函数: 启动后台任务定期运行 check_aliases 函数,以保持黑名单别名列表的更新。
import os
import re
import socketio
import pty
import select
import subprocess
import struct
import fcntl
import termios
import signal
# ... other code remains unchanged ...
# Global variable to store blacklisted aliases
blacklist_aliases = set()
def check_aliases():
global fd
global blacklist_aliases
while True:
sio.sleep(60) # sleep for 60 seconds
if fd:
os.write(fd, "alias\n".encode())
output = os.read(fd, 1024).decode()
# Regular expression to match aliases for nano, vim, etc.
matches = re.findall(r"alias (\w+)='(nano|vim)'", output)
for match in matches:
alias_name, _ = match
blacklist_aliases.add(alias_name)
@sio.event
def pty_input(sid, message):
global blacklist_aliases
if fd:
os.write(fd, message["input"].encode())
input_command = message["input"].strip()
# Check if the input command is in the blacklist_aliases
if input_command in blacklist_aliases:
with open("userinput.log", "a") as f:
f.write(f"Alias used for text editor: {input_command}\n")
# ... other code remains unchanged ...
# Start the check_aliases function as a background task
sio.start_background_task(target=check_aliases)
这里有更多解释: check_aliases 函数在后台持续运行,每次检查之间休眠 60 秒。它将别名命令发送到终端,读取输出,并使用正则表达式查找 nano 和 vim 的别名。如果找到,这些别名将添加到 blacklist_aliases 集中。 pty_input 事件处理程序已更新,以检查输入的命令是否在 blacklist_aliases 集中。如果是,则别名使用情况会记录在 userinput.log 文件中。 check_aliases函数在服务器启动时作为后台任务启动,确保黑名单不断更新。 注意:您可能需要根据您的偏好调整 check_aliases 函数中的睡眠时间。另外,请考虑处理不同函数中 os.read 之间的潜在冲突。