71 lines
2.6 KiB
Python
71 lines
2.6 KiB
Python
import http.server
|
|
import json
|
|
import subprocess
|
|
import yaml
|
|
import os
|
|
import time
|
|
|
|
TOKEN = os.environ.get("WHITELIST_TOKEN", "changeme")
|
|
WHITELIST_PATH = "/config/whitelist-custom.yaml"
|
|
|
|
class WhitelistHandler(http.server.BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
# Parse token from query string
|
|
from urllib.parse import urlparse, parse_qs
|
|
query = parse_qs(urlparse(self.path).query)
|
|
token = query.get("token", [""])[0]
|
|
|
|
if token != TOKEN:
|
|
self.send_response(403)
|
|
self.end_headers()
|
|
self.wfile.write(b"Forbidden")
|
|
return
|
|
|
|
# Get client IP (handle X-Forwarded-For if behind proxy)
|
|
client_ip = self.headers.get("X-Forwarded-For", self.client_address[0]).split(",")[0].strip()
|
|
|
|
# Read current whitelist
|
|
try:
|
|
with open(WHITELIST_PATH) as f:
|
|
wl = yaml.safe_load(f)
|
|
except:
|
|
wl = {"name": "custom/whitelist", "description": "Whitelist trusted IPs", "whitelist": {"reason": "trusted user IP", "ip": []}}
|
|
|
|
current_ips = wl.get("whitelist", {}).get("ip", [])
|
|
|
|
# Check if IP already whitelisted
|
|
if client_ip in current_ips:
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "already whitelisted", "ip": client_ip}).encode())
|
|
return
|
|
|
|
# Add IP, keep max 5 recent IPs
|
|
current_ips.append(client_ip)
|
|
if len(current_ips) > 5:
|
|
current_ips = current_ips[-5:]
|
|
wl["whitelist"]["ip"] = current_ips
|
|
|
|
# Write updated whitelist
|
|
with open(WHITELIST_PATH, "w") as f:
|
|
yaml.dump(wl, f, default_flow_style=False)
|
|
|
|
# Unban IP if currently banned
|
|
subprocess.run(["docker", "exec", "crowdsec", "cscli", "decisions", "delete", "--ip", client_ip], capture_output=True)
|
|
|
|
# Reload CrowdSec (HUP signal)
|
|
subprocess.run(["docker", "kill", "--signal=HUP", "crowdsec"], capture_output=True)
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "whitelisted", "ip": client_ip, "all_ips": current_ips}).encode())
|
|
|
|
def log_message(self, format, *args):
|
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {self.client_address[0]} - {args[0]}")
|
|
|
|
if __name__ == "__main__":
|
|
server = http.server.HTTPServer(("0.0.0.0", 9200), WhitelistHandler)
|
|
print(f"Whitelist API running on :9200")
|
|
server.serve_forever()
|