This repository has been archived on 2026-03-30. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
PyWPR/main.py
2025-08-24 22:09:37 +03:00

236 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import socket
import threading
import os
import mimetypes
from urllib.parse import unquote
from html import escape
import io
import sys
from contextlib import redirect_stdout
import re
import traceback
import linecache
class pyp_Functions:
@staticmethod
def e(text: str | int | float, ignoreType=False):
if isinstance(text, (str, int, float)):
print(text)
elif ignoreType:
print(escape(str(text)))
else:
raise ValueError(
f"Unsupported type for echo function,(req str or number, got {str(type(text))}) use ignoreType=True to force str conversion")
@staticmethod
def f_r(path: str):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
class pyp_IO:
def __init__(self, path, query) -> None:
self.path = path
self.query = query
self.server_pipeline: dict[str, bool |
str | None | int] = {'Redirect': False}
self.GET = {}
if query:
pairs = query.split('&')
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
self.GET[unquote(key)] = unquote(value)
else:
self.GET[unquote(pair)] = ''
def redirect(self, url: str):
self.server_pipeline['Redirect'] = url
# TODO: Implement redirect logic in server
class SandboxInstance:
def __init__(self, initial_globals=None):
# Базовые globals, плюс твои доп, если кинут
self.globals_dict = {}
if initial_globals:
self.globals_dict.update(initial_globals)
def run(self, script_str, path, additional_globals=None):
original_dir = os.getcwd()
original_sys_path = sys.path.copy()
if additional_globals:
self.globals_dict.update(additional_globals)
output_buffer = io.StringIO()
fake_filename = "<pyp_script>"
original_dont_write = sys.dont_write_bytecode
sys.dont_write_bytecode = True
linecache.cache[fake_filename] = (
len(script_str), None, script_str.splitlines(True), fake_filename)
with redirect_stdout(output_buffer):
try:
os.chdir(os.path.dirname(path)) # Сброс текущей директории
# Добавление директории скрипта в sys.path
sys.path.insert(0, os.path.dirname(path))
exec(compile(script_str, fake_filename, 'exec'), self.globals_dict)
except Exception as e:
tb_lines = traceback.format_exception(
type(e), e, e.__traceback__)
tb = [
line for line in tb_lines if "exec" not in line or fake_filename in line]
return f"<div style=\"font-size:medium;font-family:sans-serif;\">error while running script: <span style=\"font-family:monospace;" +\
f"background:black;color:white;padding:4px;border-radius:5px;margin:0 10px;" +\
f"\">{escape(str(e))}</span> 💥<br><div style=\"" +\
f"font-family:monospace;background:black;color:white;border-radius:15px;" +\
f"padding:10px;margin:10px 0;\">{escape(''.join(tb)).replace('\n', '<br>').replace(' ', '&nbsp').replace('\t', '&nbsp'*4)}</div><br><br></div>"
finally:
os.chdir(original_dir)
sys.path = original_sys_path
sys.dont_write_bytecode = original_dont_write
linecache.cache.pop(fake_filename, None)
return output_buffer.getvalue().strip()
def normalize_indentation(s):
if not s:
return ''
lines = s.splitlines()
n = 0
first_non_empty = next((line for line in lines if line.lstrip(' ')), None)
if first_non_empty:
n = len(first_non_empty) - len(first_non_empty.lstrip(' '))
result = []
for i, line in enumerate(lines):
if line.lstrip(' ') and len(line) - len(line.lstrip(' ')) < n:
# raise in sandbox
return f"raise ValueError(\"Indentation error at line {i}\")"
result.append(line[n:] if line.lstrip(' ') else '')
return '\n'.join(result)
class PyWPRServer:
def __init__(self, host='0.0.0.0', port=8000, doc_root='www'):
self.host, self.port = host, port
self.doc_root = os.path.abspath(doc_root)
os.makedirs(self.doc_root, exist_ok=True)
self.sock = None
self._stop = False
def _response(self, status_line, headers=None, body=b''):
headers = headers or {}
if isinstance(body, str):
body = body.encode('utf-8')
headers.setdefault('Content-Type', 'text/html; charset=utf-8')
headers.setdefault('Content-Length', str(len(body)))
hdrs = '\r\n'.join(f'{k}: {v}' for k, v in headers.items())
return (f'{status_line}\r\n{hdrs}\r\n\r\n').encode('utf-8') + body
def _not_found(self):
return self._response('HTTP/1.1 404 Not Found', body=b'<h1>404 Not Found</h1>')
def _execute_pyp(self, body: str, path: str, query: str):
script_path = self.doc_root + path
sandbox = SandboxInstance(
{'e': pyp_Functions.e, 'f_r': pyp_Functions.f_r, "__file__": script_path, "__name__": "__pyp__", "pyp": pyp_IO(path, query)})
pattern = re.compile(r'<py!>(.*?)</py!>', re.DOTALL)
snippets = pattern.findall(body)
for snippet in snippets:
snippet_code = normalize_indentation(snippet)
result = sandbox.run(snippet_code, path=script_path)
body = body.replace(f'<py!>{snippet}</py!>', result)
eq_pattern = re.compile(r'<p\?>(.*?)</p\?>', re.DOTALL)
eq_snippets = eq_pattern.findall(body)
for eq_snippet in eq_snippets:
result = sandbox.run(f'e({eq_snippet})', path=script_path)
body = body.replace(f'<p?>{eq_snippet}</p?>', result)
return body
def _serve_path(self, req_path):
# print("Request:", req_path)
path = unquote(req_path.split('?', 1)[0])
if path.endswith('/'):
path += 'index.pyp'
elif os.path.isdir(self.doc_root + path):
path += '/index.pyp'
full = os.path.normpath(os.path.join(self.doc_root, path.lstrip('/')))
if not full.startswith(self.doc_root) or not os.path.isfile(full):
return self._not_found()
ctype = (mimetypes.guess_type(full)[
0] or 'application/octet-stream') if not path.endswith(".pyp") else 'text/html'
# if text/* or html, add charset=utf-8
if ctype.startswith('text/') or ctype == 'application/json' or ctype == 'application/javascript':
ctype = f'{ctype}; charset=utf-8'
try:
# Open text files as UTF-8 when possible, binary otherwise
if ctype.startswith('text/') or ctype.endswith('charset=utf-8'):
with open(full, 'r', encoding='utf-8', errors='replace') as f:
body = f.read()
if path.endswith('.pyp'):
query = req_path.split(
'?', 1)[1] if '?' in req_path else ''
body = self._execute_pyp(body, path, query)
return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype, 'Server': 'PyWRP-server'}, body=body.encode())
else:
with open(full, 'rb') as f:
body = f.read()
return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype}, body=body)
except Exception:
return self._response('HTTP/1.1 500 Internal Server Error', body=b'<h1>500</h1>')
def _handle(self, client, addr):
try:
data = client.recv(4096).decode('utf-8', errors='replace')
if not data:
return
first = data.splitlines()[0]
parts = first.split()
if len(parts) < 2:
client.sendall(self._response(
'HTTP/1.1 400 Bad Request', body=b'<h1>400</h1>'))
return
method, path = parts[0], parts[1]
if method != 'GET':
client.sendall(self._response(
'HTTP/1.1 405 Method Not Allowed', headers={'Allow': 'GET'}, body=b'<h1>405</h1>'))
return
resp = self._serve_path(path)
client.sendall(resp)
finally:
client.close()
def start(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.host, self.port))
self.sock.listen(5)
try:
while not self._stop:
client, addr = self.sock.accept()
threading.Thread(target=self._handle, args=(
client, addr), daemon=True).start()
finally:
self.sock.close()
def stop(self):
self._stop = True
try:
with socket.create_connection((self.host, self.port), timeout=1):
pass
except Exception:
pass
if __name__ == '__main__':
s = PyWPRServer(host='127.0.0.1', port=8000, doc_root='www')
print('Serving', s.doc_root, 'on', f'{s.host}:{s.port}')
try:
s.start()
except KeyboardInterrupt:
s.stop()
print('Stopped.')