#!/usr/bin/env python3
"""
Runs a command inside a pseudo-terminal with a fixed size and timeout, then
converts its ANSI output to HTML.
"""
import argparse
import fcntl
import json
import os
import pty
import re
import select
import signal
import shlex
import struct
import subprocess
import termios
import time
import sys
from typing import List, Dict, Any, Optional
DEFAULT_BASIC_COLORS = (
"#000000",
"#800000",
"#008000",
"#808000",
"#000080",
"#800080",
"#008080",
"#c0c0c0",
)
DEFAULT_BRIGHT_COLORS = (
"#808080",
"#ff0000",
"#00ff00",
"#ffff00",
"#0000ff",
"#ff00ff",
"#00ffff",
"#ffffff",
)
BASIC_COLORS = DEFAULT_BASIC_COLORS
BRIGHT_COLORS = DEFAULT_BRIGHT_COLORS
ALL_COLORS = BASIC_COLORS + BRIGHT_COLORS
def set_color_palette(color_map: Dict[str, Any]) -> None:
values = []
for i in range(16):
val = color_map.get(f"color{i}")
if not (isinstance(val, str) and re.match(r"^#[0-9a-fA-F]{6}$", val)):
return
values.append(val)
global BASIC_COLORS, BRIGHT_COLORS, ALL_COLORS
BASIC_COLORS = tuple(values[:8])
BRIGHT_COLORS = tuple(values[8:])
ALL_COLORS = BASIC_COLORS + BRIGHT_COLORS
def create_state() -> Dict[str, Any]:
return {
"row": 0,
"col": 0,
"lines": [[]],
"fg": None,
"bg": None,
"bold": False,
"ignoreClearsAfterAltExit": False,
"savedRow": 0,
"savedCol": 0,
}
def escape_html(text: str) -> str:
return (
text.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
)
def ansi256_to_hex(code: str):
try:
c = int(code, 10)
except ValueError:
return None
if 0 <= c < 16:
return ALL_COLORS[c]
if 16 <= c <= 231:
idx = c - 16
r = idx // 36
g = (idx % 36) // 6
b = idx % 6
values = [r, g, b]
comps = [0 if n == 0 else 55 + n * 40 for n in values]
return "#" + "".join(f"{v:02x}" for v in comps)
if 232 <= c <= 255:
level = 8 + (c - 232) * 10
return f"#{level:02x}{level:02x}{level:02x}"
return None
def basic_color(code: str):
try:
c = int(code, 10)
except ValueError:
return None
if 30 <= c <= 37:
return BASIC_COLORS[c - 30]
if 90 <= c <= 97:
return BRIGHT_COLORS[c - 90]
if 40 <= c <= 47:
return BASIC_COLORS[c - 40]
if 100 <= c <= 107:
return BRIGHT_COLORS[c - 100]
return None
def rgb_to_hex(r: str, g: str, b: str):
try:
values = [int(r), int(g), int(b)]
except ValueError:
return None
if any(v < 0 or v > 255 for v in values):
return None
return "#" + "".join(f"{v:02x}" for v in values)
def current_style(state: Dict[str, Any]) -> str:
parts: List[str] = []
if state["fg"]:
parts.append(f"color:{state['fg']}")
if state["bg"]:
parts.append(f"background-color:{state['bg']}")
if state["bold"]:
parts.append("font-weight:bold")
return ";".join(parts)
def ensure_line(state: Dict[str, Any], row: int) -> None:
while len(state["lines"]) <= row:
state["lines"].append([])
def set_cursor(state: Dict[str, Any], row: int, col: int) -> None:
state["row"] = max(0, row)
state["col"] = max(0, col)
ensure_line(state, state["row"])
def apply_sgr(state: Dict[str, Any], code_str: str) -> None:
codes = ["0"] if code_str == "" else [c for c in code_str.split(";") if c != ""]
i = 0
while i < len(codes):
code_num = codes[i]
if code_num == "?25":
i += 1
continue
num = int(code_num) if code_num.isdigit() else None
if num == 0:
state["fg"] = None
state["bg"] = None
state["bold"] = False
elif num == 1:
state["bold"] = True
elif num == 22:
state["bold"] = False
elif num == 39:
state["fg"] = None
elif num == 49:
state["bg"] = None
elif num == 38 and i + 1 < len(codes) and codes[i + 1] == "5":
state["fg"] = ansi256_to_hex(codes[i + 2]) if i + 2 < len(codes) else None
i += 2
elif num == 48 and i + 1 < len(codes) and codes[i + 1] == "5":
state["bg"] = ansi256_to_hex(codes[i + 2]) if i + 2 < len(codes) else None
i += 2
elif num == 38 and i + 3 < len(codes) and codes[i + 1] == "2":
state["fg"] = rgb_to_hex(codes[i + 2], codes[i + 3], codes[i + 4]) if i + 4 < len(codes) else None
i += 4
elif num == 48 and i + 3 < len(codes) and codes[i + 1] == "2":
state["bg"] = rgb_to_hex(codes[i + 2], codes[i + 3], codes[i + 4]) if i + 4 < len(codes) else None
i += 4
elif num is not None and ((30 <= num <= 37) or (90 <= num <= 97)):
state["fg"] = basic_color(str(num))
elif num is not None and ((40 <= num <= 47) or (100 <= num <= 107)):
state["bg"] = basic_color(str(num))
i += 1
def apply_csi(state: Dict[str, Any], params: Optional[str], code: str) -> None:
if params is None:
cleaned = ""
parts: List[str] = []
else:
cleaned = re.sub(r"\?", "", params)
parts = [] if cleaned == "" else [p for p in cleaned.split(";") if p != ""]
first = int(parts[0]) if parts and parts[0].isdigit() else None
mode = parts[0] if parts else ""
is_private = params is not None and "?" in params
if code == "m":
apply_sgr(state, cleaned)
elif code == "d":
set_cursor(state, max(0, (first or 1) - 1), state["col"])
elif code == "a":
set_cursor(state, state["row"], state["col"] + (first or 1))
elif code == "e":
set_cursor(state, state["row"] + (first or 1), state["col"])
elif code == "`":
set_cursor(state, state["row"], max(0, (first or 1) - 1))
elif code in ("h", "l") and mode == "1049":
if code == "h":
state["lines"] = [[]]
set_cursor(state, 0, 0)
state["ignoreClearsAfterAltExit"] = False
else:
set_cursor(state, 0, 0)
state["ignoreClearsAfterAltExit"] = True
elif code in ("h", "l") and is_private:
pass
elif code in ("h", "l"):
pass
elif code == "s":
state["savedRow"] = state["row"]
state["savedCol"] = state["col"]
elif code == "u":
set_cursor(state, state.get("savedRow", 0), state.get("savedCol", 0))
elif code == "A":
set_cursor(state, state["row"] - (first or 1), state["col"])
elif code == "B":
set_cursor(state, state["row"] + (first or 1), state["col"])
elif code == "C":
set_cursor(state, state["row"], state["col"] + (first or 1))
elif code == "D":
if params is None:
set_cursor(state, state["row"] + 1, state["col"])
else:
set_cursor(state, state["row"], max(0, state["col"] - (first or 1)))
elif code == "G":
set_cursor(state, state["row"], max(0, (first or 1) - 1))
elif code in ("H", "f"):
row_val = first or 1
col_val = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else 1
set_cursor(state, row_val - 1, max(0, col_val - 1))
elif code == "E":
if params is None:
set_cursor(state, state["row"] + 1, 0)
else:
set_cursor(state, state["row"] + (first or 1), 0)
elif code == "F":
set_cursor(state, max(0, state["row"] - (first or 1)), 0)
elif code == "M":
if params is None:
set_cursor(state, max(0, state["row"] - 1), state["col"])
else:
delete_count = first or 1
for _ in range(delete_count):
if state["row"] < len(state["lines"]):
state["lines"].pop(state["row"])
state["lines"].append([])
ensure_line(state, state["row"])
elif code == "K":
ensure_line(state, state["row"])
line = state["lines"][state["row"]]
start = state["col"]
for i in range(start, len(line)):
line[i] = {"ch": " ", "style": ""}
elif code == "J":
ensure_line(state, state["row"])
mode_num = 0 if first is None else first
if not state["ignoreClearsAfterAltExit"]:
if mode_num == 2:
state["lines"] = [[]]
set_cursor(state, 0, 0)
else:
for r in range(state["row"], len(state["lines"])):
line = state["lines"][r]
start = state["col"] if r == state["row"] else 0
for i in range(start, len(line)):
line[i] = {"ch": " ", "style": ""}
elif code == "r":
set_cursor(state, 0, 0)
elif code == "t":
pass
def write_char(state: Dict[str, Any], ch: str) -> None:
ensure_line(state, state["row"])
line = state["lines"][state["row"]]
while len(line) <= state["col"]:
line.append({"ch": " ", "style": ""})
line[state["col"]] = {"ch": ch, "style": current_style(state)}
state["col"] += 1
def append_text(state: Dict[str, Any], text: str) -> None:
for ch in text:
if ch == "\n":
set_cursor(state, state["row"] + 1, 0)
elif ch == "\r":
set_cursor(state, state["row"], 0)
elif ch == "\b":
set_cursor(state, state["row"], max(0, state["col"] - 1))
elif ch == "\t":
next_stop = ((state["col"] // 8) + 1) * 8
set_cursor(state, state["row"], next_stop)
else:
write_char(state, ch)
def parse_ansi(state: Dict[str, Any], text: str) -> None:
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
index = 0
while index < len(normalized):
esc_index = normalized.find("\x1b", index)
chunk = normalized[index:] if esc_index == -1 else normalized[index:esc_index]
append_text(state, chunk)
if esc_index == -1:
break
slice_text = normalized[esc_index:]
csi_match = re.match(r"^\x1b\[([0-9?;]*)([A-Za-z])", slice_text)
if csi_match:
apply_csi(state, csi_match.group(1), csi_match.group(2))
index = esc_index + len(csi_match.group(0))
continue
esc_move = re.match(r"^\x1b([DEM])", slice_text)
if esc_move:
apply_csi(state, None, esc_move.group(1))
index = esc_index + len(esc_move.group(0))
continue
simple_esc = re.match(r"^\x1b[\(\)][A-Za-z0-9]", slice_text)
if simple_esc:
index = esc_index + len(simple_esc.group(0))
continue
one_char_esc = re.match(r"^\x1b[][><=]", slice_text)
if one_char_esc:
index = esc_index + len(one_char_esc.group(0))
continue
index = esc_index + 1
def lines_to_html(lines: List[List[Dict[str, str]]]) -> str:
def render_line(line: List[Dict[str, str]]) -> str:
if not line:
return " "
html_parts: List[str] = []
open_style = ""
for cell in line:
cell = cell or {"ch": " ", "style": ""}
safe_char = " " if cell["ch"] == " " else escape_html(cell["ch"])
if cell["style"] != open_style:
if open_style:
html_parts.append("")
if cell["style"]:
html_parts.append(f'')
open_style = cell["style"]
html_parts.append(safe_char)
if open_style:
html_parts.append("")
return "".join(html_parts)
return "
".join(render_line(line) for line in lines)
def strip_unhandled_control_codes(raw_text: str) -> str:
# Remove control characters we don't explicitly handle (e.g. SI/SO 0x0e/0x0f) to avoid stray glyphs
# Keep controls we do handle like backspace (0x08), tab/newline/carriage return.
return re.sub(r"[\x00-\x07\x0b-\x0c\x0e-\x0f\x10-\x1a\x1c-\x1f\x7f]", "", raw_text)
def ansi_to_html(raw_text: str) -> str:
state = create_state()
parse_ansi(state, strip_unhandled_control_codes(raw_text))
if len(state["lines"]) > 1 and state["lines"][1]:
first_cell = state["lines"][1][0]
if first_cell.get("ch") == ">":
state["lines"][1][0] = {"ch": " ", "style": first_cell.get("style", "")}
return lines_to_html(state["lines"])
def parse_args():
parser = argparse.ArgumentParser(description="Wrap a command and convert ANSI output to HTML.")
parser.add_argument("--width", type=int, required=True, help="Pseudo-terminal width (columns).")
parser.add_argument("--height", type=int, required=True, help="Pseudo-terminal height (rows).")
parser.add_argument("--timeout", type=float, required=True, help="Timeout in seconds.")
parser.add_argument("--raw", type=int, default=0, help="Return raw command output when set to 1.")
parser.add_argument(
"--colors",
type=str,
help="JSON map of color0-color15 to hex values for the 16-color palette.",
)
parser.add_argument("cmd", nargs=argparse.REMAINDER, help="Command to run after '--'.")
args = parser.parse_args()
command = args.cmd
if command and command[0] == "--":
command = command[1:]
if not command:
parser.error("No command specified. Usage: wrapCommand --width=80 --height=40 --timeout=3 -- cmd")
if len(command) == 1:
single = command[0]
if re.search(r"[|&;<>()$`!{}\n]", single):
command = ["sh", "-c", single]
else:
try:
split_cmd = shlex.split(single)
if split_cmd:
command = split_cmd
except ValueError:
pass
return args.width, args.height, args.timeout, bool(args.raw), command, args.colors
def set_winsize(fd: int, rows: int, cols: int) -> None:
packed = struct.pack("HHHH", rows, cols, 0, 0)
if hasattr(termios, "TIOCSWINSZ"):
fcntl.ioctl(fd, termios.TIOCSWINSZ, packed)
def run_command(cols: int, rows: int, timeout_sec: float, command: List[str]) -> str:
master_fd, slave_fd = pty.openpty()
try:
set_winsize(slave_fd, rows, cols)
proc = subprocess.Popen(
command,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True,
env={**os.environ, "TERM": "xterm-256color"},
)
finally:
os.close(slave_fd)
output = bytearray()
deadline = time.time() + timeout_sec
killed = False
while True:
if not killed and time.time() >= deadline and proc.poll() is None:
os.kill(proc.pid, signal.SIGKILL)
killed = True
rlist, _, _ = select.select([master_fd], [], [], 0.1)
if rlist:
try:
chunk = os.read(master_fd, 4096)
except OSError:
break
if chunk:
output.extend(chunk)
continue
break
if proc.poll() is not None:
drain, _, _ = select.select([master_fd], [], [], 0)
if not drain:
break
try:
proc.wait(timeout=0)
except subprocess.TimeoutExpired:
pass
finally:
os.close(master_fd)
return output.decode(errors="replace")
def main():
width, height, timeout_sec, raw_output, command, colors_json = parse_args()
if colors_json:
try:
parsed_colors = json.loads(colors_json)
if isinstance(parsed_colors, dict):
set_color_palette(parsed_colors)
except json.JSONDecodeError:
pass
raw_text = run_command(width, height, timeout_sec, command)
if raw_output:
sys.stdout.write(raw_text)
else:
html = ansi_to_html(raw_text)
print(html)
if __name__ == "__main__":
main()