first commit

This commit is contained in:
2025-08-24 22:09:37 +03:00
commit 176fc62a24
14 changed files with 673 additions and 0 deletions

235
main.py Normal file
View File

@@ -0,0 +1,235 @@
import socket
import threading
import os
import mimetypes
from urllib.parse import unquote
from html import escape
import io
import sys
from contextlib import redirect_stdout
import re
import traceback
import linecache
class pyp_Functions:
@staticmethod
def e(text: str | int | float, ignoreType=False):
if isinstance(text, (str, int, float)):
print(text)
elif ignoreType:
print(escape(str(text)))
else:
raise ValueError(
f"Unsupported type for echo function,(req str or number, got {str(type(text))}) use ignoreType=True to force str conversion")
@staticmethod
def f_r(path: str):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
class pyp_IO:
def __init__(self, path, query) -> None:
self.path = path
self.query = query
self.server_pipeline: dict[str, bool |
str | None | int] = {'Redirect': False}
self.GET = {}
if query:
pairs = query.split('&')
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
self.GET[unquote(key)] = unquote(value)
else:
self.GET[unquote(pair)] = ''
def redirect(self, url: str):
self.server_pipeline['Redirect'] = url
# TODO: Implement redirect logic in server
class SandboxInstance:
def __init__(self, initial_globals=None):
# Базовые globals, плюс твои доп, если кинут
self.globals_dict = {}
if initial_globals:
self.globals_dict.update(initial_globals)
def run(self, script_str, path, additional_globals=None):
original_dir = os.getcwd()
original_sys_path = sys.path.copy()
if additional_globals:
self.globals_dict.update(additional_globals)
output_buffer = io.StringIO()
fake_filename = "<pyp_script>"
original_dont_write = sys.dont_write_bytecode
sys.dont_write_bytecode = True
linecache.cache[fake_filename] = (
len(script_str), None, script_str.splitlines(True), fake_filename)
with redirect_stdout(output_buffer):
try:
os.chdir(os.path.dirname(path)) # Сброс текущей директории
# Добавление директории скрипта в sys.path
sys.path.insert(0, os.path.dirname(path))
exec(compile(script_str, fake_filename, 'exec'), self.globals_dict)
except Exception as e:
tb_lines = traceback.format_exception(
type(e), e, e.__traceback__)
tb = [
line for line in tb_lines if "exec" not in line or fake_filename in line]
return f"<div style=\"font-size:medium;font-family:sans-serif;\">error while running script: <span style=\"font-family:monospace;" +\
f"background:black;color:white;padding:4px;border-radius:5px;margin:0 10px;" +\
f"\">{escape(str(e))}</span> 💥<br><div style=\"" +\
f"font-family:monospace;background:black;color:white;border-radius:15px;" +\
f"padding:10px;margin:10px 0;\">{escape(''.join(tb)).replace('\n', '<br>').replace(' ', '&nbsp').replace('\t', '&nbsp'*4)}</div><br><br></div>"
finally:
os.chdir(original_dir)
sys.path = original_sys_path
sys.dont_write_bytecode = original_dont_write
linecache.cache.pop(fake_filename, None)
return output_buffer.getvalue().strip()
def normalize_indentation(s):
if not s:
return ''
lines = s.splitlines()
n = 0
first_non_empty = next((line for line in lines if line.lstrip(' ')), None)
if first_non_empty:
n = len(first_non_empty) - len(first_non_empty.lstrip(' '))
result = []
for i, line in enumerate(lines):
if line.lstrip(' ') and len(line) - len(line.lstrip(' ')) < n:
# raise in sandbox
return f"raise ValueError(\"Indentation error at line {i}\")"
result.append(line[n:] if line.lstrip(' ') else '')
return '\n'.join(result)
class PyWPRServer:
def __init__(self, host='0.0.0.0', port=8000, doc_root='www'):
self.host, self.port = host, port
self.doc_root = os.path.abspath(doc_root)
os.makedirs(self.doc_root, exist_ok=True)
self.sock = None
self._stop = False
def _response(self, status_line, headers=None, body=b''):
headers = headers or {}
if isinstance(body, str):
body = body.encode('utf-8')
headers.setdefault('Content-Type', 'text/html; charset=utf-8')
headers.setdefault('Content-Length', str(len(body)))
hdrs = '\r\n'.join(f'{k}: {v}' for k, v in headers.items())
return (f'{status_line}\r\n{hdrs}\r\n\r\n').encode('utf-8') + body
def _not_found(self):
return self._response('HTTP/1.1 404 Not Found', body=b'<h1>404 Not Found</h1>')
def _execute_pyp(self, body: str, path: str, query: str):
script_path = self.doc_root + path
sandbox = SandboxInstance(
{'e': pyp_Functions.e, 'f_r': pyp_Functions.f_r, "__file__": script_path, "__name__": "__pyp__", "pyp": pyp_IO(path, query)})
pattern = re.compile(r'<py!>(.*?)</py!>', re.DOTALL)
snippets = pattern.findall(body)
for snippet in snippets:
snippet_code = normalize_indentation(snippet)
result = sandbox.run(snippet_code, path=script_path)
body = body.replace(f'<py!>{snippet}</py!>', result)
eq_pattern = re.compile(r'<p\?>(.*?)</p\?>', re.DOTALL)
eq_snippets = eq_pattern.findall(body)
for eq_snippet in eq_snippets:
result = sandbox.run(f'e({eq_snippet})', path=script_path)
body = body.replace(f'<p?>{eq_snippet}</p?>', result)
return body
def _serve_path(self, req_path):
# print("Request:", req_path)
path = unquote(req_path.split('?', 1)[0])
if path.endswith('/'):
path += 'index.pyp'
elif os.path.isdir(self.doc_root + path):
path += '/index.pyp'
full = os.path.normpath(os.path.join(self.doc_root, path.lstrip('/')))
if not full.startswith(self.doc_root) or not os.path.isfile(full):
return self._not_found()
ctype = (mimetypes.guess_type(full)[
0] or 'application/octet-stream') if not path.endswith(".pyp") else 'text/html'
# if text/* or html, add charset=utf-8
if ctype.startswith('text/') or ctype == 'application/json' or ctype == 'application/javascript':
ctype = f'{ctype}; charset=utf-8'
try:
# Open text files as UTF-8 when possible, binary otherwise
if ctype.startswith('text/') or ctype.endswith('charset=utf-8'):
with open(full, 'r', encoding='utf-8', errors='replace') as f:
body = f.read()
if path.endswith('.pyp'):
query = req_path.split(
'?', 1)[1] if '?' in req_path else ''
body = self._execute_pyp(body, path, query)
return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype, 'Server': 'PyWRP-server'}, body=body.encode())
else:
with open(full, 'rb') as f:
body = f.read()
return self._response('HTTP/1.1 200 OK', headers={'Content-Type': ctype}, body=body)
except Exception:
return self._response('HTTP/1.1 500 Internal Server Error', body=b'<h1>500</h1>')
def _handle(self, client, addr):
try:
data = client.recv(4096).decode('utf-8', errors='replace')
if not data:
return
first = data.splitlines()[0]
parts = first.split()
if len(parts) < 2:
client.sendall(self._response(
'HTTP/1.1 400 Bad Request', body=b'<h1>400</h1>'))
return
method, path = parts[0], parts[1]
if method != 'GET':
client.sendall(self._response(
'HTTP/1.1 405 Method Not Allowed', headers={'Allow': 'GET'}, body=b'<h1>405</h1>'))
return
resp = self._serve_path(path)
client.sendall(resp)
finally:
client.close()
def start(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.host, self.port))
self.sock.listen(5)
try:
while not self._stop:
client, addr = self.sock.accept()
threading.Thread(target=self._handle, args=(
client, addr), daemon=True).start()
finally:
self.sock.close()
def stop(self):
self._stop = True
try:
with socket.create_connection((self.host, self.port), timeout=1):
pass
except Exception:
pass
if __name__ == '__main__':
s = PyWPRServer(host='127.0.0.1', port=8000, doc_root='www')
print('Serving', s.doc_root, 'on', f'{s.host}:{s.port}')
try:
s.start()
except KeyboardInterrupt:
s.stop()
print('Stopped.')