Module connpy.grpc.server
Functions
def handle_errors(func)-
Expand source code
def handle_errors(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except ConnpyError as e: context = kwargs.get("context") or args[-1] context.abort(grpc.StatusCode.INTERNAL, str(e)) except Exception as e: context = kwargs.get("context") or args[-1] context.abort(grpc.StatusCode.UNKNOWN, str(e)) return wrapper def serve(config, port=8048, debug=False)-
Expand source code
def serve(config, port=8048, debug=False): interceptors = [LoggingInterceptor()] if debug else [] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=interceptors) connpy_pb2_grpc.add_NodeServiceServicer_to_server(NodeServicer(config), server) connpy_pb2_grpc.add_ProfileServiceServicer_to_server(ProfileServicer(config), server) connpy_pb2_grpc.add_ConfigServiceServicer_to_server(ConfigServicer(config), server) plugin_servicer = PluginServicer(config) connpy_pb2_grpc.add_PluginServiceServicer_to_server(plugin_servicer, server) remote_plugin_pb2_grpc.add_RemotePluginServiceServicer_to_server(plugin_servicer, server) connpy_pb2_grpc.add_ExecutionServiceServicer_to_server(ExecutionServicer(config), server) connpy_pb2_grpc.add_ImportExportServiceServicer_to_server(ImportExportServicer(config), server) connpy_pb2_grpc.add_AIServiceServicer_to_server(AIServicer(config), server) connpy_pb2_grpc.add_SystemServiceServicer_to_server(SystemServicer(config), server) server.add_insecure_port(f'[::]:{port}') server.start() return server
Classes
class AIServicer (config)-
Expand source code
class AIServicer(connpy_pb2_grpc.AIServiceServicer): def __init__(self, config): self.service = AIService(config) @handle_errors def ask(self, request_iterator, context): import queue import threading # In bidirectional mode, the first request contains the query try: first_request = next(request_iterator) except StopIteration: return history = from_value(first_request.chat_history) overrides = {} if first_request.engineer_model: overrides["engineer_model"] = first_request.engineer_model if first_request.engineer_api_key: overrides["engineer_api_key"] = first_request.engineer_api_key if first_request.architect_model: overrides["architect_model"] = first_request.architect_model if first_request.architect_api_key: overrides["architect_api_key"] = first_request.architect_api_key chunk_queue = queue.Queue() request_queue = queue.Queue() bridge = StatusBridge(chunk_queue, request_queue=request_queue) # Start a thread to pull subsequent requests from the client (confirmations) def pull_requests(): try: for req in request_iterator: if req.interrupt and bridge.on_interrupt: bridge.on_interrupt() request_queue.put(req) except Exception: pass finally: request_queue.put(None) threading.Thread(target=pull_requests, daemon=True).start() def callback(chunk): chunk_queue.put(("text", chunk)) result_container = {} def run_ai(): try: res = self.service.ask( first_request.input_text, dryrun=first_request.dryrun, chat_history=history if history else None, session_id=first_request.session_id if first_request.session_id else None, debug=first_request.debug, status=bridge, console=bridge, confirm_handler=bridge.confirm, chunk_callback=callback, trust=first_request.trust, **overrides ) result_container["res"] = res except Exception as e: chunk_queue.put(("status", f"[bold fail]Error: {str(e)}[/bold fail]")) result_container["error"] = e finally: chunk_queue.put(None) # Sentinel t = threading.Thread(target=run_ai, daemon=True) bridge.thread = t t.start() while True: item = chunk_queue.get() if item is None: break msg_type, val = item if msg_type == "text": yield connpy_pb2.AIResponse(text_chunk=val, is_final=False) elif msg_type == "status": yield connpy_pb2.AIResponse(status_update=val, is_final=False) elif msg_type == "debug": yield connpy_pb2.AIResponse(debug_message=val, is_final=False) elif msg_type == "important": yield connpy_pb2.AIResponse(important_message=val, is_final=False) elif msg_type == "confirm": yield connpy_pb2.AIResponse(status_update=val, requires_confirmation=True, is_final=False) if "error" in result_container: raise result_container["error"] yield connpy_pb2.AIResponse( is_final=True, full_result=to_struct(result_container.get("res", {})) ) @handle_errors def confirm(self, request, context): res = self.service.confirm(request.value) return connpy_pb2.BoolResponse(value=res) @handle_errors def list_sessions(self, request, context): return connpy_pb2.ValueResponse(data=to_value(self.service.list_sessions())) @handle_errors def delete_session(self, request, context): self.service.delete_session(request.value) return Empty() @handle_errors def configure_provider(self, request, context): self.service.configure_provider(request.provider, request.model, request.api_key) return Empty() @handle_errors def load_session_data(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.load_session_data(request.value)))Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ConfigServicer (config)-
Expand source code
class ConfigServicer(connpy_pb2_grpc.ConfigServiceServicer): def __init__(self, config): self.service = ConfigService(config) @handle_errors def get_settings(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.get_settings())) @handle_errors def get_default_dir(self, request, context): return connpy_pb2.StringResponse(value=self.service.get_default_dir()) @handle_errors def set_config_folder(self, request, context): self.service.set_config_folder(request.value) return Empty() @handle_errors def update_setting(self, request, context): self.service.update_setting(request.key, from_value(request.value)) return Empty() @handle_errors def encrypt_password(self, request, context): return connpy_pb2.StringResponse(value=self.service.encrypt_password(request.value)) @handle_errors def apply_theme_from_file(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.apply_theme_from_file(request.value)))Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ExecutionServicer (config)-
Expand source code
class ExecutionServicer(connpy_pb2_grpc.ExecutionServiceServicer): def __init__(self, config): self.service = ExecutionService(config) @handle_errors def run_commands(self, request, context): import queue import threading nodes_filter = request.nodes[0] if len(request.nodes) == 1 else list(request.nodes) q = queue.Queue() def _on_complete(unique, output, status): q.put({"unique_id": unique, "output": output, "status": status}) def _worker(): try: self.service.run_commands( nodes_filter=nodes_filter, commands=list(request.commands), folder=request.folder if request.folder else None, prompt=request.prompt if request.prompt else None, parallel=request.parallel, variables=from_struct(request.vars) if request.HasField("vars") else None, on_node_complete=_on_complete ) except Exception as e: # Optionally pass error to stream, but handle_errors decorator covers top-level. # However, thread exceptions won't reach context.abort directly. q.put(e) finally: q.put(None) threading.Thread(target=_worker, daemon=True).start() while True: item = q.get() if item is None: break if isinstance(item, Exception): raise item yield connpy_pb2.NodeRunResult( unique_id=item["unique_id"], output=item["output"], status=item["status"] ) @handle_errors def test_commands(self, request, context): import queue import threading nodes_filter = request.nodes[0] if len(request.nodes) == 1 else list(request.nodes) q = queue.Queue() def _on_complete(unique, output, status, result): q.put({"unique_id": unique, "output": output, "status": status, "result": result}) def _worker(): try: self.service.test_commands( nodes_filter=nodes_filter, commands=list(request.commands), expected=request.expected, folder=request.folder if request.folder else None, prompt=request.prompt if request.prompt else None, parallel=request.parallel, variables=from_struct(request.vars) if request.HasField("vars") else None, on_node_complete=_on_complete ) except Exception as e: q.put(e) finally: q.put(None) threading.Thread(target=_worker, daemon=True).start() while True: item = q.get() if item is None: break if isinstance(item, Exception): raise item res = connpy_pb2.NodeRunResult( unique_id=item["unique_id"], output=item["output"], status=item["status"] ) if item["result"] is not None: res.test_result.CopyFrom(to_struct(item["result"])) yield res @handle_errors def run_cli_script(self, request, context): res = self.service.run_cli_script(request.param1, request.param2, request.parallel) return connpy_pb2.StructResponse(data=to_struct(res)) @handle_errors def run_yaml_playbook(self, request, context): res = self.service.run_yaml_playbook(request.param1, request.parallel) return connpy_pb2.StructResponse(data=to_struct(res))Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ImportExportServicer (config)-
Expand source code
class ImportExportServicer(connpy_pb2_grpc.ImportExportServiceServicer): def __init__(self, config): self.service = ImportExportService(config) self.node_service = NodeService(config) @handle_errors def export_to_file(self, request, context): self.service.export_to_file(request.file_path, list(request.folders) if request.folders else None) return Empty() @handle_errors def import_from_file(self, request, context): if request.value.startswith("---YAML---\n"): import yaml content = request.value[len("---YAML---\n"):] data = yaml.load(content, Loader=yaml.FullLoader) self.service.import_from_dict(data) else: self.service.import_from_file(request.value) self.node_service.generate_cache() return Empty() @handle_errors def set_reserved_names(self, request, context): self.service.set_reserved_names(list(request.items)) self.node_service.generate_cache() return Empty()Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class LoggingInterceptor-
Expand source code
class LoggingInterceptor(grpc.ServerInterceptor): def __init__(self): from rich.console import Console from ..printer import connpy_theme self.console = Console(theme=connpy_theme) def intercept_service(self, continuation, handler_call_details): import time method = handler_call_details.method self.console.print(f"[debug][DEBUG][/debug] gRPC Incoming Request: [bold cyan]{method}[/bold cyan]") start_time = time.time() try: result = continuation(handler_call_details) except Exception as e: self.console.print(f"[debug][DEBUG][/debug] [bold red]ERROR[/bold red] in {method}: {e}") raise e finally: duration = (time.time() - start_time) * 1000 self.console.print(f"[debug][DEBUG][/debug] Completed [bold cyan]{method}[/bold cyan] in {duration:.2f}ms") return resultAffords intercepting incoming RPCs on the service-side.
Ancestors
- grpc.ServerInterceptor
- abc.ABC
Methods
def intercept_service(self, continuation, handler_call_details)-
Expand source code
def intercept_service(self, continuation, handler_call_details): import time method = handler_call_details.method self.console.print(f"[debug][DEBUG][/debug] gRPC Incoming Request: [bold cyan]{method}[/bold cyan]") start_time = time.time() try: result = continuation(handler_call_details) except Exception as e: self.console.print(f"[debug][DEBUG][/debug] [bold red]ERROR[/bold red] in {method}: {e}") raise e finally: duration = (time.time() - start_time) * 1000 self.console.print(f"[debug][DEBUG][/debug] Completed [bold cyan]{method}[/bold cyan] in {duration:.2f}ms") return resultIntercepts incoming RPCs before handing them over to a handler.
State can be passed from an interceptor to downstream interceptors via contextvars. The first interceptor is called from an empty contextvars.Context, and the same Context is used for downstream interceptors and for the final handler call. Note that there are no guarantees that interceptors and handlers will be called from the same thread.
Args
continuation- A function that takes a HandlerCallDetails and proceeds to invoke the next interceptor in the chain, if any, or the RPC handler lookup logic, with the call details passed as an argument, and returns an RpcMethodHandler instance if the RPC is considered serviced, or None otherwise.
handler_call_details- A HandlerCallDetails describing the RPC.
Returns
An RpcMethodHandler with which the RPC may be serviced if the interceptor chooses to service this RPC, or None otherwise.
class NodeServicer (config)-
Expand source code
class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): def __init__(self, config): self.service = NodeService(config) @handle_errors def interact_node(self, request_iterator, context): import sys import select import os from connpy.core import node from ..services.profile_service import ProfileService # Fetch first setup packet try: first_req = next(request_iterator) except StopIteration: context.abort(grpc.StatusCode.INVALID_ARGUMENT, "No setup request received") unique_id = first_req.id sftp = first_req.sftp debug = first_req.debug node_data = self.service.config.getitem(unique_id, extract=False) profile_service = ProfileService(self.service.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.service.config) if sftp: n.protocol = "sftp" connect = n._connect(debug=debug) if connect != True: context.abort(grpc.StatusCode.INTERNAL, "Failed to connect to node") import threading import queue stdin_queue = queue.Queue() running = True def read_requests(): try: for req in request_iterator: if not running: break if req.cols > 0 and req.rows > 0: try: n.child.setwinsize(req.rows, req.cols) except Exception: pass if req.stdin_data: stdin_queue.put(req.stdin_data) except grpc.RpcError: pass t = threading.Thread(target=read_requests, daemon=True) t.start() # Set initial window size if provided if first_req.cols > 0 and first_req.rows > 0: try: n.child.setwinsize(first_req.rows, first_req.cols) except Exception: pass try: while n.child.isalive() and running: r, _, _ = select.select([n.child.child_fd], [], [], 0.05) if r: try: data = os.read(n.child.child_fd, 4096) if not data: break yield connpy_pb2.InteractResponse(stdout_data=data) except OSError: break while not stdin_queue.empty(): data = stdin_queue.get_nowait() try: os.write(n.child.child_fd, data) except OSError: running = False break finally: running = False try: n.child.terminate(force=True) except Exception: pass @handle_errors def list_nodes(self, request, context): f = request.filter_str if request.filter_str else None fmt = request.format_str if request.format_str else None return connpy_pb2.ValueResponse(data=to_value(self.service.list_nodes(f, fmt))) @handle_errors def list_folders(self, request, context): f = request.filter_str if request.filter_str else None return connpy_pb2.ValueResponse(data=to_value(self.service.list_folders(f))) @handle_errors def get_node_details(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.get_node_details(request.id))) @handle_errors def explode_unique(self, request, context): return connpy_pb2.ValueResponse(data=to_value(self.service.explode_unique(request.id))) @handle_errors def generate_cache(self, request, context): self.service.generate_cache() return Empty() @handle_errors def add_node(self, request, context): self.service.add_node(request.id, from_struct(request.data), request.is_folder) self.service.generate_cache() return Empty() @handle_errors def update_node(self, request, context): self.service.update_node(request.id, from_struct(request.data)) self.service.generate_cache() return Empty() @handle_errors def delete_node(self, request, context): self.service.delete_node(request.id, request.is_folder) self.service.generate_cache() return Empty() @handle_errors def move_node(self, request, context): self.service.move_node(request.src_id, request.dst_id, request.copy) self.service.generate_cache() return Empty() @handle_errors def bulk_add(self, request, context): self.service.bulk_add(list(request.ids), list(request.hosts), from_struct(request.common_data)) self.service.generate_cache() return Empty() @handle_errors def set_reserved_names(self, request, context): self.service.set_reserved_names(list(request.items)) self.service.generate_cache() return Empty() @handle_errors def full_replace(self, request, context): connections = from_struct(request.connections) profiles = from_struct(request.profiles) self.service.full_replace(connections, profiles) self.service.generate_cache() return Empty() @handle_errors def get_inventory(self, request, context): data = self.service.get_inventory() return connpy_pb2.FullReplaceRequest( connections=to_struct(data["connections"]), profiles=to_struct(data["profiles"]) )Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class PluginServicer (config)-
Expand source code
class PluginServicer(connpy_pb2_grpc.PluginServiceServicer, remote_plugin_pb2_grpc.RemotePluginServiceServicer): def __init__(self, config): self.service = PluginService(config) @handle_errors def list_plugins(self, request, context): return connpy_pb2.ValueResponse(data=to_value(self.service.list_plugins())) @handle_errors def add_plugin(self, request, context): if request.source_file.startswith("---CONTENT---\n"): content = request.source_file[len("---CONTENT---\n"):].encode() self.service.add_plugin_from_bytes(request.name, content, request.update) else: self.service.add_plugin(request.name, request.source_file, request.update) return Empty() @handle_errors def delete_plugin(self, request, context): self.service.delete_plugin(request.id) return Empty() @handle_errors def enable_plugin(self, request, context): self.service.enable_plugin(request.id) return Empty() @handle_errors def disable_plugin(self, request, context): self.service.disable_plugin(request.id) return Empty() @handle_errors def get_plugin_source(self, request, context): source = self.service.get_plugin_source(request.id) return remote_plugin_pb2.StringResponse(value=source) @handle_errors def invoke_plugin(self, request, context): args_dict = json.loads(request.args_json) for chunk in self.service.invoke_plugin(request.name, args_dict): yield remote_plugin_pb2.OutputChunk(text=chunk)Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ProfileServicer (config)-
Expand source code
class ProfileServicer(connpy_pb2_grpc.ProfileServiceServicer): def __init__(self, config): self.service = ProfileService(config) self.node_service = NodeService(config) @handle_errors def list_profiles(self, request, context): f = request.filter_str if request.filter_str else None return connpy_pb2.ValueResponse(data=to_value(self.service.list_profiles(f))) @handle_errors def get_profile(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.get_profile(request.name, request.resolve))) @handle_errors def add_profile(self, request, context): self.service.add_profile(request.id, from_struct(request.data)) self.node_service.generate_cache() return Empty() @handle_errors def resolve_node_data(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.resolve_node_data(from_struct(request.data)))) @handle_errors def delete_profile(self, request, context): self.service.delete_profile(request.id) self.node_service.generate_cache() return Empty() @handle_errors def update_profile(self, request, context): self.service.update_profile(request.id, from_struct(request.data)) self.node_service.generate_cache() return Empty()Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class StatusBridge (q, request_queue=None)-
Expand source code
class StatusBridge: def __init__(self, q, request_queue=None): self.q = q self.request_queue = request_queue self.on_interrupt = self._force_interrupt self.thread = None def _force_interrupt(self): """Forcefully raise KeyboardInterrupt in the target thread.""" if self.thread and self.thread.ident: # Standard Python trick to raise an exception in a specific thread ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(self.thread.ident), ctypes.py_object(KeyboardInterrupt) ) def update(self, msg): self.q.put(("status", msg)) def stop(self): pass def print(self, *args, **kwargs): # Capture Rich output and send as debug message self._print_to_queue("debug", *args, **kwargs) def print_important(self, *args, **kwargs): # Capture Rich output and send as important message (always show) self._print_to_queue("important", *args, **kwargs) def _print_to_queue(self, msg_type, *args, **kwargs): from rich.console import Console from io import StringIO from ..printer import connpy_theme buf = StringIO() # Use a high-quality console for rendering with the app's theme c = Console(file=buf, force_terminal=True, width=100, theme=connpy_theme) c.print(*args, **kwargs) self.q.put((msg_type, buf.getvalue())) def confirm(self, prompt, default="n"): """Bridge confirmation to the gRPC client.""" if not self.request_queue: return default # Render markup to ANSI for the client from rich.console import Console from io import StringIO from ..printer import connpy_theme buf = StringIO() c = Console(file=buf, force_terminal=True, theme=connpy_theme) c.print(prompt, end="") ansi_prompt = buf.getvalue() # Send confirmation request to client self.q.put(("confirm", ansi_prompt)) # Wait for the client to send back the answer via the request stream try: # Block until we get the next request from the client req = self.request_queue.get() if req and req.confirmation_answer: return req.confirmation_answer except Exception: pass return defaultMethods
def confirm(self, prompt, default='n')-
Expand source code
def confirm(self, prompt, default="n"): """Bridge confirmation to the gRPC client.""" if not self.request_queue: return default # Render markup to ANSI for the client from rich.console import Console from io import StringIO from ..printer import connpy_theme buf = StringIO() c = Console(file=buf, force_terminal=True, theme=connpy_theme) c.print(prompt, end="") ansi_prompt = buf.getvalue() # Send confirmation request to client self.q.put(("confirm", ansi_prompt)) # Wait for the client to send back the answer via the request stream try: # Block until we get the next request from the client req = self.request_queue.get() if req and req.confirmation_answer: return req.confirmation_answer except Exception: pass return defaultBridge confirmation to the gRPC client.
def print(self, *args, **kwargs)-
Expand source code
def print(self, *args, **kwargs): # Capture Rich output and send as debug message self._print_to_queue("debug", *args, **kwargs) def print_important(self, *args, **kwargs)-
Expand source code
def print_important(self, *args, **kwargs): # Capture Rich output and send as important message (always show) self._print_to_queue("important", *args, **kwargs) def stop(self)-
Expand source code
def stop(self): pass def update(self, msg)-
Expand source code
def update(self, msg): self.q.put(("status", msg))
class SystemServicer (config)-
Expand source code
class SystemServicer(connpy_pb2_grpc.SystemServiceServicer): def __init__(self, config): self.service = SystemService(config) @handle_errors def start_api(self, request, context): self.service.start_api(request.value) return Empty() @handle_errors def debug_api(self, request, context): self.service.debug_api(request.value) return Empty() @handle_errors def stop_api(self, request, context): self.service.stop_api() return Empty() @handle_errors def restart_api(self, request, context): self.service.restart_api(request.value) return Empty() @handle_errors def get_api_status(self, request, context): return connpy_pb2.BoolResponse(value=self.service.get_api_status())Missing associated documentation comment in .proto file.
Ancestors
Inherited members