forgejo/forgejo-watch-sshkeys.j2

84 lines
2.8 KiB
Text
Raw Permalink Normal View History

#!/usr/bin/python3
import os
import time
import re
import sys
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
except ImportError:
watchdog_installed = False
else:
watchdog_installed = True
def rewrite_ssh_command(original_command):
return "/usr/bin/podman exec -i -e SSH_ORIGINAL_COMMAND -u git forgejo " + original_command
def process_file(input_file_path, output_file_path):
with open(input_file_path, 'r') as file:
lines = file.readlines()
new_lines = []
for line in lines:
match = re.search(r'(command="([^"]+)")', line.strip())
if match:
original_command = match.group(2)
new_command = rewrite_ssh_command(original_command)
new_line = line.replace(original_command, new_command)
new_lines.append(new_line)
else:
new_lines.append(line)
with open(output_file_path, 'w') as file:
file.writelines(new_lines)
os.chmod(output_file_path, 0o600)
def watch_file(file_to_watch, callback):
if watchdog_installed:
event_handler = FileChangeHandler(callback)
observer = Observer()
observer.schedule(event_handler, path=os.path.dirname(file_to_watch), recursive=False)
observer.start()
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
observer.stop()
observer.join()
else:
print("Watchdog not installed. Using polling method. This is not optimal, please install the python module watchdog for better performance")
last_modified_time = os.path.getmtime(file_to_watch)
while True:
current_modified_time = os.path.getmtime(file_to_watch)
if current_modified_time != last_modified_time:
print(f"{file_to_watch} has been modified. Executing callback...")
callback()
last_modified_time = current_modified_time
time.sleep(3)
if __name__ == "__main__":
if os.getuid() != 0 or '_CONTAINERS_ROOTLESS_UID' not in os.environ:
print("This program needs to be root of your user namespace in order to have the correct permissions")
print("start this program with podman unshare " + sys.argv[0])
sys.exit(1)
input_file = "{{ homedir }}/data/git/.ssh/authorized_keys"
output_file = "{{ homedir }}/.ssh/authorized_keys"
output_dir = os.path.dirname(output_file)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
os.chmod(output_dir, 0o700)
def callback_function():
process_file(input_file, output_file)
# ensure it is at least processed once...
process_file(input_file, output_file)
watch_file(input_file, callback_function)