Initial commit: TS3 raw query to TS6 SSH query translation proxy

Bridges the legacy TeamSpeak 3 ServerQuery raw protocol (TCP 10011) to
TeamSpeak 6's SSH Query interface (TCP 10022). Includes unit tests,
Docker setup, and integration test scaffold.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Joshua Hirsig 2026-03-14 17:33:03 +01:00
commit 50ba7135a7
7 changed files with 516 additions and 0 deletions

10
.gitignore vendored Normal file
View file

@ -0,0 +1,10 @@
__pycache__/
*.pyc
.venv/
venv/
*.egg-info/
dist/
build/
.pytest_cache/
.mypy_cache/
.env

10
Dockerfile Normal file
View file

@ -0,0 +1,10 @@
FROM python:3.12-slim
RUN pip install --no-cache-dir asyncssh
COPY proxy.py /app/proxy.py
WORKDIR /app
EXPOSE 10011
CMD ["python", "-u", "proxy.py"]

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Joshua Hirsig
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

59
README.md Normal file
View file

@ -0,0 +1,59 @@
# ts3-query-proxy
A translation layer that bridges the legacy TeamSpeak 3 ServerQuery raw protocol (TCP port 10011) to TeamSpeak 6's SSH Query interface (TCP port 10022).
## Why?
TeamSpeak 6 dropped support for the raw ServerQuery protocol on port 10011. Many existing tools (like TS3MusicBot) rely on this protocol. This proxy sits in front of TS6 and transparently translates raw query connections to SSH query sessions, requiring zero changes to existing tools.
## How it works
```
TS3MusicBot ──TCP:10011──▶ ts3-query-proxy ──SSH:10022──▶ TeamSpeak 6
(raw query) (ssh query)
```
1. Client connects to the proxy on TCP port 10011
2. Proxy sends the standard TS3 ServerQuery welcome banner
3. Client sends `login serveradmin <password>`
4. Proxy captures credentials and opens an SSH session to TS6
5. All subsequent commands are proxied bidirectionally
## Quick start (Docker)
```bash
docker compose up -d
```
The proxy expects a TeamSpeak 6 server reachable at hostname `teamspeak6` on port `10022` (SSH query). Adjust via environment variables:
| Variable | Default | Description |
|----------|---------|-------------|
| `TS6_HOST` | `teamspeak6` | TS6 server hostname |
| `TS6_SSH_PORT` | `10022` | TS6 SSH query port |
| `LISTEN_PORT` | `10011` | Port the proxy listens on |
## Docker Compose example
```yaml
services:
ts3query-proxy:
image: ghcr.io/yourusername/ts3-query-proxy:latest
# or: build: .
restart: unless-stopped
ports:
- "10011:10011"
environment:
- TS6_HOST=teamspeak6
- TS6_SSH_PORT=10022
networks:
- your_ts6_network
networks:
your_ts6_network:
external: true
```
## License
MIT

16
docker-compose.yaml Normal file
View file

@ -0,0 +1,16 @@
services:
ts3query-proxy:
build: .
container_name: ts3query-proxy
restart: unless-stopped
ports:
- "10011:10011"
environment:
- TS6_HOST=teamspeak6
- TS6_SSH_PORT=10022
networks:
- teamspeak_default
networks:
teamspeak_default:
external: true

209
proxy.py Normal file
View file

@ -0,0 +1,209 @@
import asyncio
import os
import logging
import sys
import asyncssh
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stdout,
)
log = logging.getLogger("ts3query-proxy")
TS6_HOST = os.environ.get("TS6_HOST", "teamspeak6")
TS6_SSH_PORT = int(os.environ.get("TS6_SSH_PORT", "10022"))
LISTEN_HOST = "0.0.0.0"
LISTEN_PORT = int(os.environ.get("LISTEN_PORT", "10011"))
TS3_BANNER = (
b"TS3\n\r"
b"Welcome to the TeamSpeak 3 ServerQuery interface, "
b'type "help" for a list of commands and "help <command>" '
b"for information on a specific command.\n\r"
)
class ClientHandler:
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer
self.ssh_conn: asyncssh.SSHClientConnection | None = None
self.ssh_process: asyncssh.SSHClientProcess | None = None
self.addr = writer.get_extra_info("peername")
async def handle(self) -> None:
log.info("Client connected from %s", self.addr)
try:
self.writer.write(TS3_BANNER)
await self.writer.drain()
await self._auth_loop()
except asyncio.TimeoutError:
log.warning("Client %s timed out", self.addr)
except (ConnectionResetError, BrokenPipeError):
log.info("Client %s disconnected abruptly", self.addr)
except Exception as e:
log.error("Error handling %s: %s", self.addr, e)
finally:
await self._cleanup()
async def _auth_loop(self) -> None:
while True:
line = await asyncio.wait_for(self.reader.readline(), timeout=30)
if not line:
return
cmd = line.decode("utf-8", errors="replace").strip()
if not cmd:
continue
log.info("Pre-auth from %s: %s", self.addr, _sanitize(cmd))
if cmd.lower().startswith("login "):
await self._handle_login(cmd)
return
elif cmd.lower() == "quit":
self.writer.write(b"error id=0 msg=ok\n\r")
await self.writer.drain()
return
else:
self.writer.write(b"error id=1794 msg=not\\slogged\\sin\n\r")
await self.writer.drain()
async def _handle_login(self, cmd: str) -> None:
parts = cmd.split(maxsplit=2)
if len(parts) < 3:
self.writer.write(b"error id=256 msg=command\\snot\\sfound\n\r")
await self.writer.drain()
return
username, password = parts[1], parts[2]
if await self._connect_ssh(username, password):
self.writer.write(b"error id=0 msg=ok\n\r")
await self.writer.drain()
await self._proxy()
else:
self.writer.write(
b"error id=520 msg=invalid\\sloginname\\sor\\spassword\n\r"
)
await self.writer.drain()
async def _connect_ssh(self, username: str, password: str) -> bool:
try:
log.info("SSH connecting to %s:%d as %s", TS6_HOST, TS6_SSH_PORT, username)
self.ssh_conn = await asyncio.wait_for(
asyncssh.connect(
TS6_HOST,
TS6_SSH_PORT,
username=username,
password=password,
known_hosts=None,
),
timeout=10,
)
self.ssh_process = await self.ssh_conn.create_process(term_type=None)
# Read and discard the TS6 SSH banner
banner = ""
while True:
try:
chunk = await asyncio.wait_for(
self.ssh_process.stdout.read(4096), timeout=3
)
if not chunk:
break
banner += chunk
if "help" in banner.lower():
break
except asyncio.TimeoutError:
break
log.info("SSH session established for %s", self.addr)
return True
except asyncssh.PermissionDenied:
log.warning("SSH auth failed for %s (bad credentials)", self.addr)
return False
except Exception as e:
log.error("SSH connection to %s:%d failed: %s", TS6_HOST, TS6_SSH_PORT, e)
return False
async def _proxy(self) -> None:
log.info("Proxying traffic for %s", self.addr)
async def client_to_ssh() -> None:
try:
while True:
line = await self.reader.readline()
if not line:
break
self.ssh_process.stdin.write(
line.decode("utf-8", errors="replace")
)
except (ConnectionResetError, BrokenPipeError):
pass
async def ssh_to_client() -> None:
try:
while True:
data = await self.ssh_process.stdout.read(4096)
if not data:
break
raw = data.encode() if isinstance(data, str) else data
self.writer.write(raw)
await self.writer.drain()
except (ConnectionResetError, BrokenPipeError):
pass
tasks = [
asyncio.create_task(client_to_ssh()),
asyncio.create_task(ssh_to_client()),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
# Await cancelled tasks to suppress warnings
for t in pending:
try:
await t
except asyncio.CancelledError:
pass
async def _cleanup(self) -> None:
try:
if self.ssh_process:
self.ssh_process.close()
if self.ssh_conn:
self.ssh_conn.close()
self.writer.close()
await self.writer.wait_closed()
except Exception:
pass
log.info("Client %s disconnected", self.addr)
def _sanitize(cmd: str) -> str:
"""Hide password in login commands for logging."""
if cmd.lower().startswith("login "):
parts = cmd.split(maxsplit=2)
if len(parts) >= 3:
return f"login {parts[1]} ***"
return cmd
async def handle_client(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
handler = ClientHandler(reader, writer)
await handler.handle()
async def main() -> None:
server = await asyncio.start_server(handle_client, LISTEN_HOST, LISTEN_PORT)
log.info("TS3 Query Proxy listening on %s:%d", LISTEN_HOST, LISTEN_PORT)
log.info("Forwarding to %s:%d (SSH Query)", TS6_HOST, TS6_SSH_PORT)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

191
tests/test_proxy.py Normal file
View file

@ -0,0 +1,191 @@
"""Tests for ts3-query-proxy.
Unit tests mock the SSH layer; integration tests require a real TS6 instance.
Set TS6_TEST_HOST, TS6_TEST_SSH_PORT, TS6_TEST_USER, TS6_TEST_PASS env vars
to enable integration tests.
"""
import asyncio
import os
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from proxy import ClientHandler, TS3_BANNER, _sanitize
class TestSanitize(unittest.TestCase):
def test_hides_password(self):
self.assertEqual(_sanitize("login serveradmin secret123"), "login serveradmin ***")
def test_ignores_non_login(self):
self.assertEqual(_sanitize("serverinfo"), "serverinfo")
def test_short_login(self):
self.assertEqual(_sanitize("login serveradmin"), "login serveradmin")
class TestBanner(unittest.TestCase):
def test_banner_starts_with_ts3(self):
self.assertTrue(TS3_BANNER.startswith(b"TS3"))
def test_banner_contains_help(self):
self.assertIn(b"help", TS3_BANNER)
class TestClientHandlerUnit(unittest.IsolatedAsyncioTestCase):
def _make_handler(self, input_lines: list[bytes]):
reader = AsyncMock(spec=asyncio.StreamReader)
reader.readline = AsyncMock(side_effect=input_lines)
writer = MagicMock(spec=asyncio.StreamWriter)
writer.write = MagicMock()
writer.drain = AsyncMock()
writer.close = MagicMock()
writer.wait_closed = AsyncMock()
writer.get_extra_info = MagicMock(return_value=("127.0.0.1", 12345))
handler = ClientHandler(reader, writer)
return handler, writer
async def test_sends_banner_on_connect(self):
handler, writer = self._make_handler([b"quit\n"])
await handler.handle()
first_write = writer.write.call_args_list[0][0][0]
self.assertEqual(first_write, TS3_BANNER)
async def test_quit_before_login(self):
handler, writer = self._make_handler([b"quit\n"])
await handler.handle()
writes = [call[0][0] for call in writer.write.call_args_list]
self.assertIn(b"error id=0 msg=ok\n\r", writes)
async def test_command_before_login_returns_not_logged_in(self):
handler, writer = self._make_handler([b"serverinfo\n", b"quit\n"])
await handler.handle()
writes = [call[0][0] for call in writer.write.call_args_list]
self.assertIn(b"error id=1794 msg=not\\slogged\\sin\n\r", writes)
async def test_empty_disconnect(self):
handler, writer = self._make_handler([b""])
await handler.handle()
# Should not raise, just disconnect
@patch("proxy.asyncssh")
async def test_login_bad_credentials(self, mock_asyncssh):
import asyncssh as real_asyncssh
mock_asyncssh.PermissionDenied = real_asyncssh.PermissionDenied
mock_asyncssh.connect = AsyncMock(
side_effect=real_asyncssh.PermissionDenied("bad password")
)
handler, writer = self._make_handler([b"login serveradmin wrongpw\n"])
await handler.handle()
writes = [call[0][0] for call in writer.write.call_args_list]
self.assertIn(
b"error id=520 msg=invalid\\sloginname\\sor\\spassword\n\r", writes
)
async def test_login_incomplete_command(self):
handler, writer = self._make_handler([b"login serveradmin\n", b"quit\n"])
await handler.handle()
writes = [call[0][0] for call in writer.write.call_args_list]
self.assertIn(b"error id=256 msg=command\\snot\\sfound\n\r", writes)
# --- Integration tests (only run when env vars are set) ---
TS6_TEST_HOST = os.environ.get("TS6_TEST_HOST")
TS6_TEST_SSH_PORT = int(os.environ.get("TS6_TEST_SSH_PORT", "10022"))
TS6_TEST_USER = os.environ.get("TS6_TEST_USER", "serveradmin")
TS6_TEST_PASS = os.environ.get("TS6_TEST_PASS")
PROXY_TEST_PORT = int(os.environ.get("PROXY_TEST_PORT", "19011"))
@unittest.skipUnless(
TS6_TEST_HOST and TS6_TEST_PASS,
"Set TS6_TEST_HOST and TS6_TEST_PASS to run integration tests",
)
class TestIntegration(unittest.IsolatedAsyncioTestCase):
"""Integration tests that start a real proxy and connect through it."""
async def asyncSetUp(self):
os.environ["TS6_HOST"] = TS6_TEST_HOST
os.environ["TS6_SSH_PORT"] = str(TS6_TEST_SSH_PORT)
# Re-import to pick up env vars
import importlib
import proxy as proxy_mod
importlib.reload(proxy_mod)
self.server = await asyncio.start_server(
proxy_mod.handle_client, "127.0.0.1", PROXY_TEST_PORT
)
async def asyncTearDown(self):
self.server.close()
await self.server.wait_closed()
async def _connect(self):
reader, writer = await asyncio.open_connection("127.0.0.1", PROXY_TEST_PORT)
# Read banner
banner = await asyncio.wait_for(reader.read(4096), timeout=5)
self.assertIn(b"TS3", banner)
return reader, writer
async def test_login_and_version(self):
reader, writer = await self._connect()
writer.write(f"login {TS6_TEST_USER} {TS6_TEST_PASS}\n".encode())
await writer.drain()
resp = await asyncio.wait_for(reader.readline(), timeout=10)
self.assertIn(b"id=0", resp)
writer.write(b"version\n")
await writer.drain()
# version response + error line
data = b""
for _ in range(10):
chunk = await asyncio.wait_for(reader.readline(), timeout=5)
data += chunk
if b"error id=" in chunk:
break
self.assertIn(b"error id=0", data)
writer.write(b"quit\n")
await writer.drain()
writer.close()
async def test_login_wrong_password(self):
reader, writer = await self._connect()
writer.write(b"login serveradmin definitelywrong\n")
await writer.drain()
resp = await asyncio.wait_for(reader.readline(), timeout=10)
self.assertIn(b"id=520", resp)
writer.close()
async def test_serverinfo(self):
reader, writer = await self._connect()
writer.write(f"login {TS6_TEST_USER} {TS6_TEST_PASS}\n".encode())
await writer.drain()
resp = await asyncio.wait_for(reader.readline(), timeout=10)
self.assertIn(b"id=0", resp)
writer.write(b"serverinfo\n")
await writer.drain()
data = b""
for _ in range(10):
chunk = await asyncio.wait_for(reader.readline(), timeout=5)
data += chunk
if b"error id=" in chunk:
break
self.assertIn(b"virtualserver_name=", data)
writer.write(b"quit\n")
await writer.drain()
writer.close()
if __name__ == "__main__":
unittest.main()