Module connpy.grpc_layer.server
Functions
def handle_errors(func)-
Expand source code
def handle_errors(func): import inspect if inspect.isgeneratorfunction(func): def wrapper(*args, **kwargs): try: for item in func(*args, **kwargs): yield item except ConnpyError as e: context = kwargs.get("context") or args[-1] context.abort(grpc.StatusCode.INTERNAL, str(e)) except Exception as e: context = kwargs.get("context") or args[-1] context.abort(grpc.StatusCode.UNKNOWN, str(e)) finally: printer.clear_thread_state() return wrapper else: def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except ConnpyError as e: context = kwargs.get("context") or args[-1] context.abort(grpc.StatusCode.INTERNAL, str(e)) except Exception as e: context = kwargs.get("context") or args[-1] context.abort(grpc.StatusCode.UNKNOWN, str(e)) finally: printer.clear_thread_state() return wrapper def serve(config, port=8048, debug=False)-
Expand source code
def serve(config, port=8048, debug=False): interceptors = [LoggingInterceptor()] if debug else [] server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=interceptors) connpy_pb2_grpc.add_NodeServiceServicer_to_server(NodeServicer(config), server) connpy_pb2_grpc.add_ProfileServiceServicer_to_server(ProfileServicer(config), server) connpy_pb2_grpc.add_ConfigServiceServicer_to_server(ConfigServicer(config), server) plugin_servicer = PluginServicer(config) connpy_pb2_grpc.add_PluginServiceServicer_to_server(plugin_servicer, server) remote_plugin_pb2_grpc.add_RemotePluginServiceServicer_to_server(plugin_servicer, server) connpy_pb2_grpc.add_ExecutionServiceServicer_to_server(ExecutionServicer(config), server) connpy_pb2_grpc.add_ImportExportServiceServicer_to_server(ImportExportServicer(config), server) connpy_pb2_grpc.add_AIServiceServicer_to_server(AIServicer(config), server) connpy_pb2_grpc.add_SystemServiceServicer_to_server(SystemServicer(config), server) server.add_insecure_port(f'[::]:{port}') server.start() return server
Classes
class AIServicer (config)-
Expand source code
class AIServicer(connpy_pb2_grpc.AIServiceServicer): def __init__(self, config): self.service = AIService(config) @handle_errors def ask(self, request_iterator, context): import queue import threading chunk_queue = queue.Queue() request_queue = queue.Queue() bridge = None history = [] is_web = False # Dedicated event to signal AI thread to stop ai_thread = None agent_instance = None def callback(chunk): chunk_queue.put(("text", chunk)) def run_ai_task(input_text, session_id, debug, overrides, trust): nonlocal history, bridge, agent_instance try: # Run the AI interaction (this blocks this specific thread) res = self.service.ask( input_text, chat_history=history if history else None, session_id=session_id, debug=debug, status=bridge, console=bridge, confirm_handler=bridge.confirm, chunk_callback=callback, trust=trust, **overrides ) # Update history for next message if "chat_history" in res: history = res["chat_history"] # Send final chunk marker chunk_queue.put(("final_mark", res)) except Exception as e: import traceback print(f"AI Task Error: {e}") traceback.print_exc() chunk_queue.put(("status", f"Error: {str(e)}")) def request_listener(): nonlocal bridge, is_web, ai_thread, agent_instance try: for req in request_iterator: if req.interrupt: if bridge and bridge.on_interrupt: bridge.on_interrupt() continue if req.confirmation_answer: request_queue.put(req) continue if req.input_text: is_web = "web" in (req.session_id or "").lower() or (req.session_id or "").lower().startswith("ws-") if not bridge: bridge = StatusBridge(chunk_queue, request_queue=request_queue, is_web=is_web) overrides = {} if req.engineer_model: overrides["engineer_model"] = req.engineer_model if req.engineer_api_key: overrides["engineer_api_key"] = req.engineer_api_key # Start AI in its own thread so we can keep listening for interrupts ai_thread = threading.Thread( target=run_ai_task, args=(req.input_text, req.session_id, req.debug, overrides, req.trust), daemon=True ) ai_thread.start() except grpc.RpcError: pass except Exception as e: print(f"Request Listener Error: {e}") finally: # When client closes stream, send sentinel chunk_queue.put((None, None)) # Start listening for client requests/signals threading.Thread(target=request_listener, daemon=True).start() # Main response loop (yields to gRPC) while True: item = chunk_queue.get() if item == (None, None): break msg_type, val = item if msg_type == "text": yield connpy_pb2.AIResponse(text_chunk=val, is_final=False) elif msg_type == "status": if is_web and "is thinking" in val.lower(): continue clean_val = val.replace("[ai_status]", "").replace("[/ai_status]", "") yield connpy_pb2.AIResponse(status_update=clean_val, is_final=False) elif msg_type == "debug": yield connpy_pb2.AIResponse(debug_message=val, is_final=False) elif msg_type == "important": yield connpy_pb2.AIResponse(important_message=val, is_final=False) elif msg_type == "confirm": yield connpy_pb2.AIResponse(status_update=val, requires_confirmation=True, is_final=False) elif msg_type == "final_mark": yield connpy_pb2.AIResponse(is_final=True, full_result=to_struct(val)) @handle_errors def confirm(self, request, context): res = self.service.confirm(request.value) return connpy_pb2.BoolResponse(value=res) @handle_errors def list_sessions(self, request, context): return connpy_pb2.ValueResponse(data=to_value(self.service.list_sessions())) @handle_errors def delete_session(self, request, context): self.service.delete_session(request.value) return Empty() @handle_errors def configure_provider(self, request, context): self.service.configure_provider(request.provider, request.model, request.api_key) return Empty() @handle_errors def load_session_data(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.load_session_data(request.value)))Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ConfigServicer (config)-
Expand source code
class ConfigServicer(connpy_pb2_grpc.ConfigServiceServicer): def __init__(self, config): self.service = ConfigService(config) @handle_errors def get_settings(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.get_settings())) @handle_errors def get_default_dir(self, request, context): return connpy_pb2.StringResponse(value=self.service.get_default_dir()) @handle_errors def set_config_folder(self, request, context): self.service.set_config_folder(request.value) return Empty() @handle_errors def update_setting(self, request, context): self.service.update_setting(request.key, from_value(request.value)) return Empty() @handle_errors def encrypt_password(self, request, context): return connpy_pb2.StringResponse(value=self.service.encrypt_password(request.value)) @handle_errors def apply_theme_from_file(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.apply_theme_from_file(request.value)))Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ExecutionServicer (config)-
Expand source code
class ExecutionServicer(connpy_pb2_grpc.ExecutionServiceServicer): def __init__(self, config): self.service = ExecutionService(config) @handle_errors def run_commands(self, request, context): import queue import threading nodes_filter = request.nodes[0] if len(request.nodes) == 1 else list(request.nodes) q = queue.Queue() def _on_complete(unique, output, status): q.put({"unique_id": unique, "output": output, "status": status}) def _worker(): try: # Set task name in thread state for printer if available if request.name: printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]") self.service.run_commands( nodes_filter=nodes_filter, commands=list(request.commands), folder=request.folder if request.folder else None, prompt=request.prompt if request.prompt else None, parallel=request.parallel, timeout=request.timeout if request.timeout > 0 else 10, variables=from_struct(request.vars) if request.HasField("vars") else None, on_node_complete=_on_complete, name=request.name if request.name else None ) except Exception as e: # Optionally pass error to stream, but handle_errors decorator covers top-level. # However, thread exceptions won't reach context.abort directly. q.put(e) finally: q.put(None) threading.Thread(target=_worker, daemon=True).start() while True: item = q.get() if item is None: break if isinstance(item, Exception): raise item yield connpy_pb2.NodeRunResult( unique_id=item["unique_id"], output=item["output"], status=item["status"] ) @handle_errors def test_commands(self, request, context): import queue import threading nodes_filter = request.nodes[0] if len(request.nodes) == 1 else list(request.nodes) q = queue.Queue() def _on_complete(unique, node_output, node_status, node_result): q.put({"unique_id": unique, "output": node_output, "status": node_status, "result": node_result}) def _worker(): try: # Set task name in thread state for printer if available if request.name: printer.console.print(f"[debug][DEBUG][/debug] Executing task: [bold cyan]{request.name}[/bold cyan]") self.service.test_commands( nodes_filter=nodes_filter, commands=list(request.commands), expected=list(request.expected), folder=request.folder if request.folder else None, prompt=request.prompt if request.prompt else None, parallel=request.parallel, timeout=request.timeout if request.timeout > 0 else 10, variables=from_struct(request.vars) if request.HasField("vars") else None, on_node_complete=_on_complete, name=request.name if request.name else None ) except Exception as e: q.put(e) finally: q.put(None) threading.Thread(target=_worker, daemon=True).start() while True: item = q.get() if item is None: break if isinstance(item, Exception): raise item res = connpy_pb2.NodeRunResult( unique_id=item["unique_id"], output=item["output"], status=item["status"] ) if item["result"] is not None: res.test_result.CopyFrom(to_struct(item["result"])) yield res @handle_errors def run_cli_script(self, request, context): res = self.service.run_cli_script(request.param1, request.param2, request.parallel) return connpy_pb2.StructResponse(data=to_struct(res)) @handle_errors def run_yaml_playbook(self, request, context): res = self.service.run_yaml_playbook(request.param1, request.parallel) return connpy_pb2.StructResponse(data=to_struct(res))Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ImportExportServicer (config)-
Expand source code
class ImportExportServicer(connpy_pb2_grpc.ImportExportServiceServicer): def __init__(self, config): self.service = ImportExportService(config) self.node_service = NodeService(config) @handle_errors def export_to_file(self, request, context): self.service.export_to_file(request.file_path, list(request.folders) if request.folders else None) return Empty() @handle_errors def import_from_file(self, request, context): if request.value.startswith("---YAML---\n"): import yaml content = request.value[len("---YAML---\n"):] data = yaml.load(content, Loader=yaml.FullLoader) self.service.import_from_dict(data) else: self.service.import_from_file(request.value) self.node_service.generate_cache() return Empty() @handle_errors def set_reserved_names(self, request, context): self.service.set_reserved_names(list(request.items)) self.node_service.generate_cache() return Empty()Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class LoggingInterceptor-
Expand source code
class LoggingInterceptor(grpc.ServerInterceptor): def __init__(self): from rich.console import Console from ..printer import connpy_theme, get_original_stdout self.console = Console(theme=connpy_theme, file=get_original_stdout()) def intercept_service(self, continuation, handler_call_details): import time method = handler_call_details.method self.console.print(f"[debug][DEBUG][/debug] gRPC Incoming Request: [bold cyan]{method}[/bold cyan]") start_time = time.time() try: result = continuation(handler_call_details) except Exception as e: self.console.print(f"[debug][DEBUG][/debug] [bold red]ERROR[/bold red] in {method}: {e}") raise e finally: duration = (time.time() - start_time) * 1000 self.console.print(f"[debug][DEBUG][/debug] Completed [bold cyan]{method}[/bold cyan] in {duration:.2f}ms") return resultAffords intercepting incoming RPCs on the service-side.
Ancestors
- grpc.ServerInterceptor
- abc.ABC
Methods
def intercept_service(self, continuation, handler_call_details)-
Expand source code
def intercept_service(self, continuation, handler_call_details): import time method = handler_call_details.method self.console.print(f"[debug][DEBUG][/debug] gRPC Incoming Request: [bold cyan]{method}[/bold cyan]") start_time = time.time() try: result = continuation(handler_call_details) except Exception as e: self.console.print(f"[debug][DEBUG][/debug] [bold red]ERROR[/bold red] in {method}: {e}") raise e finally: duration = (time.time() - start_time) * 1000 self.console.print(f"[debug][DEBUG][/debug] Completed [bold cyan]{method}[/bold cyan] in {duration:.2f}ms") return resultIntercepts incoming RPCs before handing them over to a handler.
State can be passed from an interceptor to downstream interceptors via contextvars. The first interceptor is called from an empty contextvars.Context, and the same Context is used for downstream interceptors and for the final handler call. Note that there are no guarantees that interceptors and handlers will be called from the same thread.
Args
continuation- A function that takes a HandlerCallDetails and proceeds to invoke the next interceptor in the chain, if any, or the RPC handler lookup logic, with the call details passed as an argument, and returns an RpcMethodHandler instance if the RPC is considered serviced, or None otherwise.
handler_call_details- A HandlerCallDetails describing the RPC.
Returns
An RpcMethodHandler with which the RPC may be serviced if the interceptor chooses to service this RPC, or None otherwise.
class NodeServicer (config)-
Expand source code
class NodeServicer(connpy_pb2_grpc.NodeServiceServicer): def __init__(self, config): self.service = NodeService(config) @handle_errors def interact_node(self, request_iterator, context): import sys import os import asyncio from connpy.core import node from ..services.profile_service import ProfileService from connpy.tunnels import RemoteStream import queue import threading # Fetch first setup packet try: first_req = next(request_iterator) except StopIteration: context.abort(grpc.StatusCode.INVALID_ARGUMENT, "No setup request received") unique_id = first_req.id sftp = first_req.sftp debug = first_req.debug printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node request for: [bold cyan]{unique_id}[/bold cyan]") if first_req.connection_params_json: import json params = json.loads(first_req.connection_params_json) base_node_id = params.get("base_node") # Valid attributes that a node object accepts valid_attrs = ['host', 'options', 'logs', 'password', 'port', 'protocol', 'user', 'jumphost'] fallback_id = f"{unique_id}@remote" if unique_id == "dynamic" and params.get("host"): fallback_id = f"dynamic-{params.get('host')}@remote" if base_node_id: # Look up the base node in config and use its full data nodes = self.service.config._getallnodes(base_node_id) if nodes: device = self.service.config.getitem(nodes[0]) # Override device properties with any passed in params for attr in valid_attrs: if attr in params: device[attr] = params[attr] if "tags" in params: device_tags = device.get("tags", {}) if not isinstance(device_tags, dict): device_tags = {} device_tags.update(params["tags"]) device["tags"] = device_tags node_name = params.get("name", base_node_id) n = node(node_name, **device, config=self.service.config) else: # base_node not found, fall back to dynamic node_name = params.get("name", fallback_id) n = node(node_name, host=params.get("host", ""), config=self.service.config) for attr in valid_attrs: if attr in params: setattr(n, attr, params[attr]) if "tags" in params: n.tags = params["tags"] else: node_name = params.get("name", fallback_id) n = node(node_name, host=params.get("host", ""), config=self.service.config) for attr in valid_attrs: if attr in params: setattr(n, attr, params[attr]) if "tags" in params: n.tags = params["tags"] else: node_data = self.service.config.getitem(unique_id, extract=False) if not node_data: context.abort(grpc.StatusCode.NOT_FOUND, f"Node {unique_id} not found") profile_service = ProfileService(self.service.config) resolved_data = profile_service.resolve_node_data(node_data) n = node(unique_id, **resolved_data, config=self.service.config) if sftp: n.protocol = "sftp" connect = n._connect(debug=debug) if connect != True: yield connpy_pb2.InteractResponse(success=False, error_message=str(connect)) return # Signal successful connection to the client yield connpy_pb2.InteractResponse(success=True) # Set initial window size if provided if first_req.cols > 0 and first_req.rows > 0: try: n.child.setwinsize(first_req.rows, first_req.cols) except Exception: pass response_queue = queue.Queue() remote_stream = RemoteStream(request_iterator, response_queue) def run_async_loop(): try: n._setup_interact_environment(debug=debug, logger=None, async_mode=True) def resize_callback(rows, cols): try: n.child.setwinsize(rows, cols) except Exception: pass asyncio.run(n._async_interact_loop(remote_stream, resize_callback)) except Exception as e: pass finally: n._teardown_interact_environment() response_queue.put(None) # Signal EOF t_loop = threading.Thread(target=run_async_loop, daemon=True) t_loop.start() while True: data = response_queue.get() if data is None: printer.console.print(f"[debug][DEBUG][/debug] gRPC interact_node session closed for: [bold cyan]{unique_id}[/bold cyan]") break yield connpy_pb2.InteractResponse(stdout_data=data) @handle_errors def list_nodes(self, request, context): f = request.filter_str if request.filter_str else None fmt = request.format_str if request.format_str else None return connpy_pb2.ValueResponse(data=to_value(self.service.list_nodes(f, fmt))) @handle_errors def list_folders(self, request, context): f = request.filter_str if request.filter_str else None return connpy_pb2.ValueResponse(data=to_value(self.service.list_folders(f))) @handle_errors def get_node_details(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.get_node_details(request.id))) @handle_errors def explode_unique(self, request, context): return connpy_pb2.ValueResponse(data=to_value(self.service.explode_unique(request.id))) @handle_errors def validate_parent_folder(self, request, context): self.service.validate_parent_folder(request.id) return Empty() @handle_errors def generate_cache(self, request, context): self.service.generate_cache() return Empty() @handle_errors def add_node(self, request, context): self.service.add_node(request.id, from_struct(request.data), request.is_folder) self.service.generate_cache() return Empty() @handle_errors def update_node(self, request, context): self.service.update_node(request.id, from_struct(request.data)) self.service.generate_cache() return Empty() @handle_errors def delete_node(self, request, context): self.service.delete_node(request.id, request.is_folder) self.service.generate_cache() return Empty() @handle_errors def move_node(self, request, context): self.service.move_node(request.src_id, request.dst_id, request.copy) self.service.generate_cache() return Empty() @handle_errors def bulk_add(self, request, context): self.service.bulk_add(list(request.ids), list(request.hosts), from_struct(request.common_data)) self.service.generate_cache() return Empty() @handle_errors def set_reserved_names(self, request, context): self.service.set_reserved_names(list(request.items)) self.service.generate_cache() return Empty() @handle_errors def full_replace(self, request, context): connections = from_struct(request.connections) profiles = from_struct(request.profiles) self.service.full_replace(connections, profiles) self.service.generate_cache() return Empty() @handle_errors def get_inventory(self, request, context): data = self.service.get_inventory() return connpy_pb2.FullReplaceRequest( connections=to_struct(data["connections"]), profiles=to_struct(data["profiles"]) )Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class PluginServicer (config)-
Expand source code
class PluginServicer(connpy_pb2_grpc.PluginServiceServicer, remote_plugin_pb2_grpc.RemotePluginServiceServicer): def __init__(self, config): self.service = PluginService(config) @handle_errors def list_plugins(self, request, context): return connpy_pb2.ValueResponse(data=to_value(self.service.list_plugins())) @handle_errors def add_plugin(self, request, context): if request.source_file.startswith("---CONTENT---\n"): content = request.source_file[len("---CONTENT---\n"):].encode() self.service.add_plugin_from_bytes(request.name, content, request.update) else: self.service.add_plugin(request.name, request.source_file, request.update) return Empty() @handle_errors def delete_plugin(self, request, context): self.service.delete_plugin(request.id) return Empty() @handle_errors def enable_plugin(self, request, context): self.service.enable_plugin(request.id) return Empty() @handle_errors def disable_plugin(self, request, context): self.service.disable_plugin(request.id) return Empty() @handle_errors def get_plugin_source(self, request, context): source = self.service.get_plugin_source(request.id) return remote_plugin_pb2.StringResponse(value=source) @handle_errors def invoke_plugin(self, request, context): args_dict = json.loads(request.args_json) for chunk in self.service.invoke_plugin(request.name, args_dict): yield remote_plugin_pb2.OutputChunk(text=chunk)Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class ProfileServicer (config)-
Expand source code
class ProfileServicer(connpy_pb2_grpc.ProfileServiceServicer): def __init__(self, config): self.service = ProfileService(config) self.node_service = NodeService(config) @handle_errors def list_profiles(self, request, context): f = request.filter_str if request.filter_str else None return connpy_pb2.ValueResponse(data=to_value(self.service.list_profiles(f))) @handle_errors def get_profile(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.get_profile(request.name, request.resolve))) @handle_errors def add_profile(self, request, context): self.service.add_profile(request.id, from_struct(request.data)) self.node_service.generate_cache() return Empty() @handle_errors def resolve_node_data(self, request, context): return connpy_pb2.StructResponse(data=to_struct(self.service.resolve_node_data(from_struct(request.data)))) @handle_errors def delete_profile(self, request, context): self.service.delete_profile(request.id) self.node_service.generate_cache() return Empty() @handle_errors def update_profile(self, request, context): self.service.update_profile(request.id, from_struct(request.data)) self.node_service.generate_cache() return Empty()Missing associated documentation comment in .proto file.
Ancestors
Inherited members
class StatusBridge (q, request_queue=None, is_web=False)-
Expand source code
class StatusBridge: def __init__(self, q, request_queue=None, is_web=False): self.q = q self.request_queue = request_queue self.on_interrupt = self._force_interrupt self.thread = None self.is_web = is_web def _force_interrupt(self): """Forcefully raise KeyboardInterrupt in the target thread.""" if self.thread and self.thread.ident: # Standard Python trick to raise an exception in a specific thread import ctypes ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(self.thread.ident), ctypes.py_object(KeyboardInterrupt) ) def update(self, msg): self.q.put(("status", msg)) def stop(self): pass def print(self, *args, **kwargs): # Capture Rich output and send as debug message self._print_to_queue("debug", *args, **kwargs) def print_important(self, *args, **kwargs): # Capture Rich output and send as important message (always show) self._print_to_queue("important", *args, **kwargs) def _print_to_queue(self, msg_type, *args, **kwargs): from rich.console import Console from rich.panel import Panel from io import StringIO from ..printer import connpy_theme processed_args = list(args) if self.is_web: # Remove Panels to avoid box characters on web, but preserve Title processed_args = [] for arg in args: if isinstance(arg, Panel): # If it has a title, prepend it to the content to allow detection content = arg.renderable if arg.title: processed_args.append(f"{arg.title}\n") processed_args.append(content) else: processed_args.append(arg) buf = StringIO() # force_terminal=False removes ANSI escape codes for Web c = Console(file=buf, force_terminal=not self.is_web, width=100, theme=connpy_theme) c.print(*processed_args, **kwargs) text_content = buf.getvalue().strip() if text_content: self.q.put((msg_type, text_content)) def confirm(self, prompt, default="n"): """Bridge confirmation to the gRPC client.""" if not self.request_queue: return default # Render markup to ANSI for the client from rich.console import Console from io import StringIO from ..printer import connpy_theme buf = StringIO() c = Console(file=buf, force_terminal=True, theme=connpy_theme) c.print(prompt, end="") ansi_prompt = buf.getvalue() # Send confirmation request to client self.q.put(("confirm", ansi_prompt)) # Wait for the client to send back the answer via the request stream try: # Block until we get the next request from the client req = self.request_queue.get() if req and req.confirmation_answer: return req.confirmation_answer except Exception: pass return defaultMethods
def confirm(self, prompt, default='n')-
Expand source code
def confirm(self, prompt, default="n"): """Bridge confirmation to the gRPC client.""" if not self.request_queue: return default # Render markup to ANSI for the client from rich.console import Console from io import StringIO from ..printer import connpy_theme buf = StringIO() c = Console(file=buf, force_terminal=True, theme=connpy_theme) c.print(prompt, end="") ansi_prompt = buf.getvalue() # Send confirmation request to client self.q.put(("confirm", ansi_prompt)) # Wait for the client to send back the answer via the request stream try: # Block until we get the next request from the client req = self.request_queue.get() if req and req.confirmation_answer: return req.confirmation_answer except Exception: pass return defaultBridge confirmation to the gRPC client.
def print(self, *args, **kwargs)-
Expand source code
def print(self, *args, **kwargs): # Capture Rich output and send as debug message self._print_to_queue("debug", *args, **kwargs) def print_important(self, *args, **kwargs)-
Expand source code
def print_important(self, *args, **kwargs): # Capture Rich output and send as important message (always show) self._print_to_queue("important", *args, **kwargs) def stop(self)-
Expand source code
def stop(self): pass def update(self, msg)-
Expand source code
def update(self, msg): self.q.put(("status", msg))
class SystemServicer (config)-
Expand source code
class SystemServicer(connpy_pb2_grpc.SystemServiceServicer): def __init__(self, config): self.service = SystemService(config) @handle_errors def start_api(self, request, context): self.service.start_api(request.value) return Empty() @handle_errors def debug_api(self, request, context): self.service.debug_api(request.value) return Empty() @handle_errors def stop_api(self, request, context): self.service.stop_api() return Empty() @handle_errors def restart_api(self, request, context): self.service.restart_api(request.value) return Empty() @handle_errors def get_api_status(self, request, context): return connpy_pb2.BoolResponse(value=self.service.get_api_status())Missing associated documentation comment in .proto file.
Ancestors
Inherited members