#!/usr/bin/python3 import os import time import re import sys try: from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler except ImportError: watchdog_installed = False else: watchdog_installed = True def rewrite_ssh_command(original_command): return "/usr/bin/podman exec -i -e SSH_ORIGINAL_COMMAND -u git forgejo " + original_command def process_file(input_file_path, output_file_path): with open(input_file_path, 'r') as file: lines = file.readlines() new_lines = [] for line in lines: match = re.search(r'(command="([^"]+)")', line.strip()) if match: original_command = match.group(2) new_command = rewrite_ssh_command(original_command) new_line = line.replace(original_command, new_command) new_lines.append(new_line) else: new_lines.append(line) with open(output_file_path, 'w') as file: file.writelines(new_lines) os.chmod(output_file_path, 0o600) def watch_file(file_to_watch, callback): if watchdog_installed: event_handler = FileChangeHandler(callback) observer = Observer() observer.schedule(event_handler, path=os.path.dirname(file_to_watch), recursive=False) observer.start() try: while True: time.sleep(3600) except KeyboardInterrupt: observer.stop() observer.join() else: print("Watchdog not installed. Using polling method. This is not optimal, please install the python module watchdog for better performance") last_modified_time = os.path.getmtime(file_to_watch) while True: current_modified_time = os.path.getmtime(file_to_watch) if current_modified_time != last_modified_time: print(f"{file_to_watch} has been modified. Executing callback...") callback() last_modified_time = current_modified_time time.sleep(3) if __name__ == "__main__": if os.getuid() != 0 or '_CONTAINERS_ROOTLESS_UID' not in os.environ: print("This program needs to be root of your user namespace in order to have the correct permissions") print("start this program with podman unshare " + sys.argv[0]) sys.exit(1) input_file = "{{ homedir }}/data/git/.ssh/authorized_keys" output_file = "{{ homedir }}/.ssh/authorized_keys" output_dir = os.path.dirname(output_file) if not os.path.exists(output_dir): os.makedirs(output_dir) os.chmod(output_dir, 0o700) def callback_function(): process_file(input_file, output_file) # ensure it is at least processed once... process_file(input_file, output_file) watch_file(input_file, callback_function)