Implement hooking system for classes and methods. Cleanup of code

This commit is contained in:
Federico Luzzi 2024-05-03 17:32:45 -03:00
parent 87bb6302ff
commit 1bd9bd62c5
10 changed files with 542 additions and 1914 deletions

View File

@ -169,8 +169,8 @@ The `modify` method allows you to alter instances of a class at the time they ar
connapp.config.modify(modify_config)
```
#### Implementing Hooks with `register_pre_hook` and `register_post_hook`
These methods allow you to define custom logic to be executed before (`register_pre_hook`) or after (`register_post_hook`) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
#### Implementing Method Hooks
There are 2 methods that allows you to define custom logic to be executed before (`register_pre_hook`) or after (`register_post_hook`) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
- **Usage**: Register hooks to methods to execute additional logic before or after the main method execution.
- **Registration Methods Signature**:
@ -305,7 +305,7 @@ import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = ai(conf, organization, api_key)
myia = connpy.ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)

View File

@ -149,8 +149,8 @@ The `modify` method allows you to alter instances of a class at the time they ar
connapp.config.modify(modify_config)
```
#### Implementing Hooks with `register_pre_hook` and `register_post_hook`
These methods allow you to define custom logic to be executed before (`register_pre_hook`) or after (`register_post_hook`) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
#### Implementing Method Hooks
There are 2 methods that allows you to define custom logic to be executed before (`register_pre_hook`) or after (`register_post_hook`) the main logic of a method. This is particularly useful for logging, auditing, preprocessing inputs, postprocessing outputs or adding functionalities.
- **Usage**: Register hooks to methods to execute additional logic before or after the main method execution.
- **Registration Methods Signature**:
@ -392,7 +392,7 @@ import connpy
conf = connpy.configfile()
organization = 'openai-org'
api_key = "openai-key"
myia = ai(conf, organization, api_key)
myia = connpy.ai(conf, organization, api_key)
input = "go to router 1 and get me the full configuration"
result = myia.ask(input, dryrun = False)
print(result)
@ -413,5 +413,14 @@ __pdoc__ = {
'core': False,
'completion': False,
'api': False,
'plugins': False
'plugins': False,
'core_plugins': False,
'hooks': False,
'connapp.start': False,
'ai.deferred_class_hooks': False,
'configfile.deferred_class_hooks': False,
'node.deferred_class_hooks': False,
'nodes.deferred_class_hooks': False,
'connapp': False,
'connapp.encrypt': True
}

View File

@ -1,2 +1,2 @@
__version__ = "4.0.0b5"
__version__ = "4.0.0"

View File

@ -177,17 +177,6 @@ Categorize the user's request based on the operation they want to perform on the
self.__prompt["confirmation_function"]["parameters"]["properties"]["response"]["type"] = "string"
self.__prompt["confirmation_function"]["parameters"]["required"] = ["result"]
@MethodHook
def process_string(self, s):
if s.startswith('[') and s.endswith(']') and not (s.startswith("['") and s.endswith("']")) and not (s.startswith('["') and s.endswith('"]')):
# Extract the content inside square brackets and split by comma
content = s[1:-1].split(',')
# Add single quotes around each item and join them back together with commas
new_content = ', '.join(f"'{item.strip()}'" for item in content)
# Replace the old content with the new content
s = '[' + new_content + ']'
return s
@MethodHook
def _retry_function(self, function, max_retries, backoff_num, *args):
#Retry openai requests

View File

@ -49,32 +49,45 @@ def _getcwd(words, option, folderonly=False):
return pathstrings
def _get_plugins(which, defaultdir):
enabled_files = []
disabled_files = []
all_files = []
all_plugins = {}
# Iterate over all files in the specified folder
for file in os.listdir(defaultdir + "/plugins"):
# Check if the file is a Python file
if file.endswith('.py'):
enabled_files.append(os.path.splitext(file)[0])
all_plugins[os.path.splitext(file)[0]] = os.path.join(defaultdir + "/plugins", file)
# Check if the file is a Python backup file
elif file.endswith('.py.bkp'):
disabled_files.append(os.path.splitext(os.path.splitext(file)[0])[0])
# Path to core_plugins relative to this script
core_path = os.path.dirname(os.path.realpath(__file__)) + "/core_plugins"
def get_plugins_from_directory(directory):
enabled_files = []
disabled_files = []
all_plugins = {}
# Iterate over all files in the specified folder
if os.path.exists(directory):
for file in os.listdir(directory):
# Check if the file is a Python file
if file.endswith('.py'):
enabled_files.append(os.path.splitext(file)[0])
all_plugins[os.path.splitext(file)[0]] = os.path.join(directory, file)
# Check if the file is a Python backup file
elif file.endswith('.py.bkp'):
disabled_files.append(os.path.splitext(os.path.splitext(file)[0])[0])
return enabled_files, disabled_files, all_plugins
# Get plugins from both directories
user_enabled, user_disabled, user_all_plugins = get_plugins_from_directory(defaultdir + "/plugins")
core_enabled, core_disabled, core_all_plugins = get_plugins_from_directory(core_path)
# Combine the results from user and core plugins
enabled_files = user_enabled
disabled_files = user_disabled
all_plugins = {**user_all_plugins, **core_all_plugins} # Merge dictionaries
# Return based on the command
if which == "--disable":
return enabled_files
elif which == "--enable":
return disabled_files
elif which in ["--del", "--update"]:
all_files.extend(enabled_files)
all_files.extend(disabled_files)
all_files = enabled_files + disabled_files
return all_files
elif which == "all":
return all_plugins
def main():
home = os.path.expanduser("~")
defaultdir = home + '/.config/conn'
@ -144,7 +157,7 @@ def main():
if words[0] in ["--rm", "--del", "-r", "--mod", "--edit", "-e", "--show", "-s", "mv", "move", "cp", "copy"]:
strings.extend(nodes)
if words[0] == "plugin":
strings = ["--help", "--add", "--update", "--del", "--enable", "--disable"]
strings = ["--help", "--add", "--update", "--del", "--enable", "--disable", "--list"]
if words[0] in ["run", "import", "export"]:
strings = ["--help"]
if words[0] == "export":

View File

@ -4,6 +4,7 @@ import json
import os
import re
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from pathlib import Path
from copy import deepcopy
from .hooks import MethodHook, ClassHook
@ -392,3 +393,32 @@ class configfile:
nodes.extend(layer3)
return nodes
@MethodHook
def encrypt(self, password, keyfile=None):
'''
Encrypts password using RSA keyfile
### Parameters:
- password (str): Plaintext password to encrypt.
### Optional Parameters:
- keyfile (str): Path/file to keyfile. Default is config keyfile.
### Returns:
str: Encrypted password.
'''
if keyfile is None:
keyfile = self.key
with open(keyfile) as f:
key = RSA.import_key(f.read())
f.close()
publickey = key.publickey()
encryptor = PKCS1_OAEP.new(publickey)
password = encryptor.encrypt(password.encode("utf-8"))
return str(password)

View File

@ -2,8 +2,6 @@
#Imports
import os
import re
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
import ast
import argparse
import sys
@ -1285,7 +1283,7 @@ class connapp:
passa = inquirer.prompt(passq)
if passa == None:
return False
answer["password"] = self.encrypt(passa["password"])
answer["password"] = self.config.encrypt(passa["password"])
elif answer["password"] == "Profiles":
passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
passa = inquirer.prompt(passq)
@ -1355,7 +1353,7 @@ class connapp:
return False
if "password" in answer.keys():
if answer["password"] != "":
answer["password"] = self.encrypt(answer["password"])
answer["password"] = self.config.encrypt(answer["password"])
if "tags" in answer.keys() and answer["tags"]:
answer["tags"] = ast.literal_eval(answer["tags"])
result = {**answer, **profile}
@ -1383,7 +1381,7 @@ class connapp:
if answer["password"] == "Local Password":
passq = [inquirer.Password("password", message="Set Password")]
passa = inquirer.prompt(passq)
answer["password"] = self.encrypt(passa["password"])
answer["password"] = self.config.encrypt(passa["password"])
elif answer["password"] == "Profiles":
passq = [(inquirer.Text("password", message="Set a @profile or a comma separated list of @profiles", validate=self._pass_validation))]
passa = inquirer.prompt(passq)
@ -1550,31 +1548,3 @@ tasks:
output: null
...'''
def encrypt(self, password, keyfile=None):
'''
Encrypts password using RSA keyfile
### Parameters:
- password (str): Plaintext password to encrypt.
### Optional Parameters:
- keyfile (str): Path/file to keyfile. Default is config keyfile.
### Returns:
str: Encrypted password.
'''
if keyfile is None:
keyfile = self.config.key
with open(keyfile) as f:
key = RSA.import_key(f.read())
f.close()
publickey = key.publickey()
encryptor = PKCS1_OAEP.new(publickey)
password = encryptor.encrypt(password.encode("utf-8"))
return str(password)

View File

@ -36,20 +36,33 @@ class sync:
if os.path.exists(self.token_file):
creds = Credentials.from_authorized_user_file(self.token_file, self.scopes)
# If there are no valid credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.google_client, self.scopes)
creds = flow.run_local_server(port=0, access_type='offline')
try:
# If there are no valid credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.google_client, self.scopes)
creds = flow.run_local_server(port=0, access_type='offline')
# Save the credentials for the next run
# Save the credentials for the next run
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
print("Logged in successfully.")
except RefreshError as e:
# If refresh fails, delete the invalid token file and start a new login flow
if os.path.exists(self.token_file):
os.remove(self.token_file)
print("Existing token was invalid and has been removed. Please log in again.")
flow = InstalledAppFlow.from_client_secrets_file(
self.google_client, self.scopes)
creds = flow.run_local_server(port=0, access_type='offline')
with open(self.token_file, 'w') as token:
token.write(creds.to_json())
print("Logged in successfully.")
print("Logged in successfully after re-authentication.")
def logout(self):
if os.path.exists(self.token_file):
@ -300,6 +313,8 @@ class sync:
if self.check_login_status() == True:
if not kwargs["result"]:
self.compress_and_upload()
else:
print("Sync cannot be performed. Please check your login status.")
return kwargs["result"]
def config_listener_pre(self, *args, **kwargs):

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
#Imports
from functools import wraps, partial
from functools import wraps, partial, update_wrapper
#functions and classes
@ -59,6 +59,7 @@ class ClassHook:
"""Decorator class to enable Class Modifying"""
def __init__(self, cls):
self.cls = cls
update_wrapper(self, cls, updated=()) # Update wrapper without changing underlying items
# Initialize deferred class hooks if they don't already exist
if not hasattr(cls, 'deferred_class_hooks'):
cls.deferred_class_hooks = []

File diff suppressed because it is too large Load Diff