| import asyncio
|
| import asyncssh
|
| import subprocess
|
|
|
| class MySSHServerSession(asyncssh.SSHServerSession):
|
| def __init__(self):
|
| self._process = None
|
|
|
| async def session_started(self):
|
| self._process = await asyncio.create_subprocess_exec(
|
| '/bin/bash', # The shell or command to execute
|
| stdin=asyncio.subprocess.PIPE,
|
| stdout=asyncio.subprocess.PIPE,
|
| stderr=asyncio.subprocess.PIPE
|
| )
|
|
|
| self._stdin_task = asyncio.create_task(self._forward_stdin())
|
| self._stdout_task = asyncio.create_task(self._forward_stdout())
|
| self._stderr_task = asyncio.create_task(self._forward_stderr())
|
|
|
| async def _forward_stdin(self):
|
| while True:
|
| data = await self.read(1024)
|
| if not data:
|
| break
|
| self._process.stdin.write(data.encode())
|
| await self._process.stdin.drain()
|
|
|
| async def _forward_stdout(self):
|
| while True:
|
| line = await self._process.stdout.readline()
|
| if not line:
|
| break
|
| self.write(line.decode())
|
|
|
| async def _forward_stderr(self):
|
| while True:
|
| line = await self._process.stderr.readline()
|
| if not line:
|
| break
|
| self.write(f"STDERR: {line.decode()}")
|
|
|
| async def connection_lost(self, exc):
|
| if self._process:
|
| self._process.terminate()
|
| await self._process.wait()
|
|
|
| class MySSHServer(asyncssh.SSHServer):
|
| def connection_made(self, conn):
|
| print(f"Connection from {conn.get_extra_info('peername')}")
|
|
|
| def session_requested(self):
|
| return MySSHServerSession()
|
|
|
| async def start_ssh_server():
|
| await asyncssh.create_server(
|
| MySSHServer,
|
| host='localhost',
|
| port=2222,
|
| server_host_keys=['ssh_host_key'], # Replace with your private key
|
| )
|
| print('SSH Server running on port 2222')
|
| await asyncio.Event().wait() # Keep the server running
|
|
|
| def generate_host_key():
|
| import os
|
| if not os.path.exists('ssh_host_key'):
|
| key = asyncssh.generate_private_key('ssh-rsa')
|
| key.write_private_key_file('ssh_host_key')
|
| print('Generated new host key.')
|
|
|
| if __name__ == '__main__':
|
| generate_host_key()
|
| try:
|
| asyncio.run(start_ssh_server())
|
| except (OSError, asyncssh.Error) as exc:
|
| print(f'Failed to start server: {exc}')
|