236 lines
9.4 KiB
Python
236 lines
9.4 KiB
Python
import socket
|
||
import threading
|
||
import os
|
||
import mimetypes
|
||
from urllib.parse import unquote
|
||
from html import escape
|
||
import io
|
||
import sys
|
||
from contextlib import redirect_stdout
|
||
import re
|
||
import traceback
|
||
import linecache
|
||
|
||
|
||
class pyp_Functions:
|
||
@staticmethod
|
||
def e(text: str | int | float, ignoreType=False):
|
||
if isinstance(text, (str, int, float)):
|
||
print(text)
|
||
elif ignoreType:
|
||
print(escape(str(text)))
|
||
else:
|
||
raise ValueError(
|
||
f"Unsupported type for echo function,(req str or number, got {str(type(text))}) use ignoreType=True to force str conversion")
|
||
|
||
@staticmethod
|
||
def f_r(path: str):
|
||
with open(path, "r", encoding="utf-8", errors="ignore") as f:
|
||
return f.read()
|
||
|
||
|
||
class pyp_IO:
|
||
def __init__(self, path, query) -> None:
|
||
self.path = path
|
||
self.query = query
|
||
|
||
self.server_pipeline: dict[str, bool |
|
||
str | None | int] = {'Redirect': False}
|
||
|
||
self.GET = {}
|
||
if query:
|
||
pairs = query.split('&')
|
||
for pair in pairs:
|
||
if '=' in pair:
|
||
key, value = pair.split('=', 1)
|
||
self.GET[unquote(key)] = unquote(value)
|
||
else:
|
||
self.GET[unquote(pair)] = ''
|
||
|
||
def redirect(self, url: str):
|
||
self.server_pipeline['Redirect'] = url
|
||
# TODO: Implement redirect logic in server
|
||
|
||
|
||
class SandboxInstance:
|
||
def __init__(self, initial_globals=None):
|
||
# Базовые globals, плюс твои доп, если кинут
|
||
self.globals_dict = {}
|
||
if initial_globals:
|
||
self.globals_dict.update(initial_globals)
|
||
|
||
def run(self, script_str, path, additional_globals=None):
|
||
original_dir = os.getcwd()
|
||
original_sys_path = sys.path.copy()
|
||
if additional_globals:
|
||
self.globals_dict.update(additional_globals)
|
||
output_buffer = io.StringIO()
|
||
fake_filename = "<pyp_script>"
|
||
original_dont_write = sys.dont_write_bytecode
|
||
sys.dont_write_bytecode = True
|
||
linecache.cache[fake_filename] = (
|
||
len(script_str), None, script_str.splitlines(True), fake_filename)
|
||
with redirect_stdout(output_buffer):
|
||
try:
|
||
os.chdir(os.path.dirname(path)) # Сброс текущей директории
|
||
# Добавление директории скрипта в sys.path
|
||
sys.path.insert(0, os.path.dirname(path))
|
||
exec(compile(script_str, fake_filename, 'exec'), self.globals_dict)
|
||
except Exception as e:
|
||
tb_lines = traceback.format_exception(
|
||
type(e), e, e.__traceback__)
|
||
tb = [
|
||
line for line in tb_lines if "exec" not in line or fake_filename in line]
|
||
return f"<div style=\"font-size:medium;font-family:sans-serif;\">error while running script: <span style=\"font-family:monospace;" +\
|
||
f"background:black;color:white;padding:4px;border-radius:5px;margin:0 10px;" +\
|
||
f"\">{escape(str(e))}</span> 💥<br><div style=\"" +\
|
||
f"font-family:monospace;background:black;color:white;border-radius:15px;" +\
|
||
f"padding:10px;margin:10px 0;\">{escape(''.join(tb)).replace('\n', '<br>').replace(' ', ' ').replace('\t', ' '*4)}</div><br><br></div>"
|
||
finally:
|
||
os.chdir(original_dir)
|
||
sys.path = original_sys_path
|
||
sys.dont_write_bytecode = original_dont_write
|
||
linecache.cache.pop(fake_filename, None)
|
||
return output_buffer.getvalue().strip()
|
||
|
||
|
||
def normalize_indentation(s):
|
||
if not s:
|
||
return ''
|
||
lines = s.splitlines()
|
||
n = 0
|
||
first_non_empty = next((line for line in lines if line.lstrip(' ')), None)
|
||
if first_non_empty:
|
||
n = len(first_non_empty) - len(first_non_empty.lstrip(' '))
|
||
result = []
|
||
for i, line in enumerate(lines):
|
||
if line.lstrip(' ') and len(line) - len(line.lstrip(' ')) < n:
|
||
# raise in sandbox
|
||
return f"raise ValueError(\"Indentation error at line {i}\")"
|
||
result.append(line[n:] if line.lstrip(' ') else '')
|
||
return '\n'.join(result)
|
||
|
||
|
||
class PyWPRServer:
|
||
def __init__(self, host='0.0.0.0', port=8000, doc_root='www'):
|
||
self.host, self.port = host, port
|
||
self.doc_root = os.path.abspath(doc_root)
|
||
os.makedirs(self.doc_root, exist_ok=True)
|
||
self.sock = None
|
||
self._stop = False
|
||
|
||
def _response(self, status_line, headers=None, body=b''):
|
||
headers = headers or {}
|
||
if isinstance(body, str):
|
||
body = body.encode('utf-8')
|
||
headers.setdefault('Content-Type', 'text/html; charset=utf-8')
|
||
headers.setdefault('Content-Length', str(len(body)))
|
||
hdrs = '\r\n'.join(f'{k}: {v}' for k, v in headers.items())
|
||
return (f'{status_line}\r\n{hdrs}\r\n\r\n').encode('utf-8') + body
|
||
|
||
def _not_found(self):
|
||
return self._response('HTTP/1.1 404 Not Found', body=b'<h1>404 Not Found</h1>')
|
||
|
||
def _execute_pyp(self, body: str, path: str, query: str):
|
||
script_path = self.doc_root + path
|
||
sandbox = SandboxInstance(
|
||
{'e': pyp_Functions.e, 'f_r': pyp_Functions.f_r, "__file__": script_path, "__name__": "__pyp__", "pyp": pyp_IO(path, query)})
|
||
pattern = re.compile(r'<py!>(.*?)</py!>', re.DOTALL)
|
||
snippets = pattern.findall(body)
|
||
for snippet in snippets:
|
||
snippet_code = normalize_indentation(snippet)
|
||
result = sandbox.run(snippet_code, path=script_path)
|
||
body = body.replace(f'<py!>{snippet}</py!>', result)
|
||
eq_pattern = re.compile(r'<p\?>(.*?)</p\?>', re.DOTALL)
|
||
eq_snippets = eq_pattern.findall(body)
|
||
for eq_snippet in eq_snippets:
|
||
result = sandbox.run(f'e({eq_snippet})', path=script_path)
|
||
body = body.replace(f'<p?>{eq_snippet}</p?>', result)
|
||
return body
|
||
|
||
def _serve_path(self, req_path):
|
||
# print("Request:", req_path)
|
||
path = unquote(req_path.split('?', 1)[0])
|
||
if path.endswith('/'):
|
||
path += 'index.pyp'
|
||
elif os.path.isdir(self.doc_root + path):
|
||
path += '/index.pyp'
|
||
|
||
full = os.path.normpath(os.path.join(self.doc_root, path.lstrip('/')))
|
||
if not full.startswith(self.doc_root) or not os.path.isfile(full):
|
||
return self._not_found()
|
||
ctype = (mimetypes.guess_type(full)[
|
||
0] or 'application/octet-stream') if not path.endswith(".pyp") else 'text/html'
|
||
# if text/* or html, add charset=utf-8
|
||
if ctype.startswith('text/') or ctype == 'application/json' or ctype == 'application/javascript':
|
||
ctype = f'{ctype}; charset=utf-8'
|
||
try:
|
||
# Open text files as UTF-8 when possible, binary otherwise
|
||
if ctype.startswith('text/') or ctype.endswith('charset=utf-8'):
|
||
with open(full, 'r', encoding='utf-8', errors='replace') as f:
|
||
body = f.read()
|
||
if path.endswith('.pyp'):
|
||
query = req_path.split(
|
||
'?', 1)[1] if '?' in req_path else ''
|
||
body = self._execute_pyp(body, path, query)
|
||
|
||
return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype, 'Server': 'PyWRP-server'}, body=body.encode())
|
||
else:
|
||
with open(full, 'rb') as f:
|
||
body = f.read()
|
||
return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype}, body=body)
|
||
except Exception:
|
||
return self._response('HTTP/1.1 500 Internal Server Error', body=b'<h1>500</h1>')
|
||
|
||
def _handle(self, client, addr):
|
||
try:
|
||
data = client.recv(4096).decode('utf-8', errors='replace')
|
||
if not data:
|
||
return
|
||
first = data.splitlines()[0]
|
||
parts = first.split()
|
||
if len(parts) < 2:
|
||
client.sendall(self._response(
|
||
'HTTP/1.1 400 Bad Request', body=b'<h1>400</h1>'))
|
||
return
|
||
method, path = parts[0], parts[1]
|
||
if method != 'GET':
|
||
client.sendall(self._response(
|
||
'HTTP/1.1 405 Method Not Allowed', headers={'Allow': 'GET'}, body=b'<h1>405</h1>'))
|
||
return
|
||
resp = self._serve_path(path)
|
||
client.sendall(resp)
|
||
finally:
|
||
client.close()
|
||
|
||
def start(self):
|
||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||
self.sock.bind((self.host, self.port))
|
||
self.sock.listen(5)
|
||
try:
|
||
while not self._stop:
|
||
client, addr = self.sock.accept()
|
||
threading.Thread(target=self._handle, args=(
|
||
client, addr), daemon=True).start()
|
||
finally:
|
||
self.sock.close()
|
||
|
||
def stop(self):
|
||
self._stop = True
|
||
try:
|
||
with socket.create_connection((self.host, self.port), timeout=1):
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == '__main__':
|
||
s = PyWPRServer(host='127.0.0.1', port=8000, doc_root='www')
|
||
print('Serving', s.doc_root, 'on', f'{s.host}:{s.port}')
|
||
try:
|
||
s.start()
|
||
except KeyboardInterrupt:
|
||
s.stop()
|
||
print('Stopped.')
|