import socket import threading import os import mimetypes from urllib.parse import unquote from html import escape import io import sys from contextlib import redirect_stdout import re import traceback import linecache class pyp_Functions: @staticmethod def e(text: str | int | float, ignoreType=False): if isinstance(text, (str, int, float)): print(text) elif ignoreType: print(escape(str(text))) else: raise ValueError( f"Unsupported type for echo function,(req str or number, got {str(type(text))}) use ignoreType=True to force str conversion") @staticmethod def f_r(path: str): with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() class pyp_IO: def __init__(self, path, query) -> None: self.path = path self.query = query self.server_pipeline: dict[str, bool | str | None | int] = {'Redirect': False} self.GET = {} if query: pairs = query.split('&') for pair in pairs: if '=' in pair: key, value = pair.split('=', 1) self.GET[unquote(key)] = unquote(value) else: self.GET[unquote(pair)] = '' def redirect(self, url: str): self.server_pipeline['Redirect'] = url # TODO: Implement redirect logic in server class SandboxInstance: def __init__(self, initial_globals=None): # Базовые globals, плюс твои доп, если кинут self.globals_dict = {} if initial_globals: self.globals_dict.update(initial_globals) def run(self, script_str, path, additional_globals=None): original_dir = os.getcwd() original_sys_path = sys.path.copy() if additional_globals: self.globals_dict.update(additional_globals) output_buffer = io.StringIO() fake_filename = "" original_dont_write = sys.dont_write_bytecode sys.dont_write_bytecode = True linecache.cache[fake_filename] = ( len(script_str), None, script_str.splitlines(True), fake_filename) with redirect_stdout(output_buffer): try: os.chdir(os.path.dirname(path)) # Сброс текущей директории # Добавление директории скрипта в sys.path sys.path.insert(0, os.path.dirname(path)) exec(compile(script_str, fake_filename, 'exec'), self.globals_dict) except Exception as e: tb_lines = traceback.format_exception( type(e), e, e.__traceback__) tb = [ line for line in tb_lines if "exec" not in line or fake_filename in line] return f"
error while running script: {escape(str(e))} 💥
{escape(''.join(tb)).replace('\n', '
').replace(' ', ' ').replace('\t', ' '*4)}


" finally: os.chdir(original_dir) sys.path = original_sys_path sys.dont_write_bytecode = original_dont_write linecache.cache.pop(fake_filename, None) return output_buffer.getvalue().strip() def normalize_indentation(s): if not s: return '' lines = s.splitlines() n = 0 first_non_empty = next((line for line in lines if line.lstrip(' ')), None) if first_non_empty: n = len(first_non_empty) - len(first_non_empty.lstrip(' ')) result = [] for i, line in enumerate(lines): if line.lstrip(' ') and len(line) - len(line.lstrip(' ')) < n: # raise in sandbox return f"raise ValueError(\"Indentation error at line {i}\")" result.append(line[n:] if line.lstrip(' ') else '') return '\n'.join(result) class PyWPRServer: def __init__(self, host='0.0.0.0', port=8000, doc_root='www'): self.host, self.port = host, port self.doc_root = os.path.abspath(doc_root) os.makedirs(self.doc_root, exist_ok=True) self.sock = None self._stop = False def _response(self, status_line, headers=None, body=b''): headers = headers or {} if isinstance(body, str): body = body.encode('utf-8') headers.setdefault('Content-Type', 'text/html; charset=utf-8') headers.setdefault('Content-Length', str(len(body))) hdrs = '\r\n'.join(f'{k}: {v}' for k, v in headers.items()) return (f'{status_line}\r\n{hdrs}\r\n\r\n').encode('utf-8') + body def _not_found(self): return self._response('HTTP/1.1 404 Not Found', body=b'

404 Not Found

') def _execute_pyp(self, body: str, path: str, query: str): script_path = self.doc_root + path sandbox = SandboxInstance( {'e': pyp_Functions.e, 'f_r': pyp_Functions.f_r, "__file__": script_path, "__name__": "__pyp__", "pyp": pyp_IO(path, query)}) pattern = re.compile(r'(.*?)', re.DOTALL) snippets = pattern.findall(body) for snippet in snippets: snippet_code = normalize_indentation(snippet) result = sandbox.run(snippet_code, path=script_path) body = body.replace(f'{snippet}', result) eq_pattern = re.compile(r'(.*?)', re.DOTALL) eq_snippets = eq_pattern.findall(body) for eq_snippet in eq_snippets: result = sandbox.run(f'e({eq_snippet})', path=script_path) body = body.replace(f'{eq_snippet}', result) return body def _serve_path(self, req_path): # print("Request:", req_path) path = unquote(req_path.split('?', 1)[0]) if path.endswith('/'): path += 'index.pyp' elif os.path.isdir(self.doc_root + path): path += '/index.pyp' full = os.path.normpath(os.path.join(self.doc_root, path.lstrip('/'))) if not full.startswith(self.doc_root) or not os.path.isfile(full): return self._not_found() ctype = (mimetypes.guess_type(full)[ 0] or 'application/octet-stream') if not path.endswith(".pyp") else 'text/html' # if text/* or html, add charset=utf-8 if ctype.startswith('text/') or ctype == 'application/json' or ctype == 'application/javascript': ctype = f'{ctype}; charset=utf-8' try: # Open text files as UTF-8 when possible, binary otherwise if ctype.startswith('text/') or ctype.endswith('charset=utf-8'): with open(full, 'r', encoding='utf-8', errors='replace') as f: body = f.read() if path.endswith('.pyp'): query = req_path.split( '?', 1)[1] if '?' in req_path else '' body = self._execute_pyp(body, path, query) return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype, 'Server': 'PyWRP-server'}, body=body.encode()) else: with open(full, 'rb') as f: body = f.read() return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype}, body=body) except Exception: return self._response('HTTP/1.1 500 Internal Server Error', body=b'

500

') def _handle(self, client, addr): try: data = client.recv(4096).decode('utf-8', errors='replace') if not data: return first = data.splitlines()[0] parts = first.split() if len(parts) < 2: client.sendall(self._response( 'HTTP/1.1 400 Bad Request', body=b'

400

')) return method, path = parts[0], parts[1] if method != 'GET': client.sendall(self._response( 'HTTP/1.1 405 Method Not Allowed', headers={'Allow': 'GET'}, body=b'

405

')) return resp = self._serve_path(path) client.sendall(resp) finally: client.close() def start(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((self.host, self.port)) self.sock.listen(5) try: while not self._stop: client, addr = self.sock.accept() threading.Thread(target=self._handle, args=( client, addr), daemon=True).start() finally: self.sock.close() def stop(self): self._stop = True try: with socket.create_connection((self.host, self.port), timeout=1): pass except Exception: pass if __name__ == '__main__': s = PyWPRServer(host='127.0.0.1', port=8000, doc_root='www') print('Serving', s.doc_root, 'on', f'{s.host}:{s.port}') try: s.start() except KeyboardInterrupt: s.stop() print('Stopped.')