New paste Repaste Download
import asyncio
import asyncssh
import subprocess
class MySSHServerSession(asyncssh.SSHServerSession):
    def __init__(self):
        self._process = None
    async def session_started(self):
        self._process = await asyncio.create_subprocess_exec(
            '/bin/bash',  # The shell or command to execute
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        self._stdin_task = asyncio.create_task(self._forward_stdin())
        self._stdout_task = asyncio.create_task(self._forward_stdout())
        self._stderr_task = asyncio.create_task(self._forward_stderr())
    async def _forward_stdin(self):
        while True:
            data = await self.read(1024)
            if not data:
                break
            self._process.stdin.write(data.encode())
            await self._process.stdin.drain()
    async def _forward_stdout(self):
        while True:
            line = await self._process.stdout.readline()
            if not line:
                break
            self.write(line.decode())
    async def _forward_stderr(self):
        while True:
            line = await self._process.stderr.readline()
            if not line:
                break
            self.write(f"STDERR: {line.decode()}")
    async def connection_lost(self, exc):
        if self._process:
            self._process.terminate()
            await self._process.wait()
class MySSHServer(asyncssh.SSHServer):
    def connection_made(self, conn):
        print(f"Connection from {conn.get_extra_info('peername')}")
    def session_requested(self):
        return MySSHServerSession()
async def start_ssh_server():
    await asyncssh.create_server(
        MySSHServer,
        host='localhost',
        port=2222,
        server_host_keys=['ssh_host_key'],  # Replace with your private key
    )
    print('SSH Server running on port 2222')
    await asyncio.Event().wait()  # Keep the server running
def generate_host_key():
    import os
    if not os.path.exists('ssh_host_key'):
        key = asyncssh.generate_private_key('ssh-rsa')
        key.write_private_key_file('ssh_host_key')
        print('Generated new host key.')
if __name__ == '__main__':
    generate_host_key()
    try:
        asyncio.run(start_ssh_server())
    except (OSError, asyncssh.Error) as exc:
        print(f'Failed to start server: {exc}')
Filename: None. Size: 2kb. View raw, , hex, or download this file.

This paste expires on 2025-01-26 00:25:12.572367. Pasted through web.