add hooks and sync to google
This commit is contained in:
parent
f96fe77aed
commit
b7528027ac
3
.gitignore
vendored
3
.gitignore
vendored
@ -127,3 +127,6 @@ dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
#clients
|
||||
*sync_client*
|
||||
|
@ -69,6 +69,7 @@ positional arguments:
|
||||
api Start and stop connpy api
|
||||
plugin Manage plugins
|
||||
config Manage app config
|
||||
sync Sync config with Google
|
||||
```
|
||||
|
||||
### Manage profiles:
|
||||
|
@ -50,6 +50,7 @@ Commands:
|
||||
api Start and stop connpy api
|
||||
plugin Manage plugins
|
||||
config Manage app config
|
||||
sync Sync config with Google
|
||||
```
|
||||
|
||||
### Manage profiles
|
||||
|
@ -1,2 +1,2 @@
|
||||
__version__ = "3.8.0"
|
||||
__version__ = "4.0.0b1"
|
||||
|
||||
|
@ -6,6 +6,8 @@ import re
|
||||
from Crypto.PublicKey import RSA
|
||||
from pathlib import Path
|
||||
from copy import deepcopy
|
||||
from .hooks import ConfigHook
|
||||
|
||||
|
||||
|
||||
#functions and classes
|
||||
@ -105,15 +107,20 @@ class configfile:
|
||||
jsonconf.close()
|
||||
return jsondata
|
||||
|
||||
@ConfigHook
|
||||
def _saveconfig(self, conf):
|
||||
#Save config file
|
||||
newconfig = {"config":{}, "connections": {}, "profiles": {}}
|
||||
newconfig["config"] = self.config
|
||||
newconfig["connections"] = self.connections
|
||||
newconfig["profiles"] = self.profiles
|
||||
with open(conf, "w") as f:
|
||||
json.dump(newconfig, f, indent = 4)
|
||||
f.close()
|
||||
try:
|
||||
with open(conf, "w") as f:
|
||||
json.dump(newconfig, f, indent = 4)
|
||||
f.close()
|
||||
except:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
def _createkey(self, keyfile):
|
||||
#Create key file
|
||||
|
@ -159,9 +159,19 @@ class connapp:
|
||||
configcrud.add_argument("--openai-model", dest="model", nargs=1, action=self._store_type, help="Set openai model", metavar="MODEL")
|
||||
configparser.set_defaults(func=self._func_others)
|
||||
#Add plugins
|
||||
file_path = self.config.defaultdir + "/plugins"
|
||||
self.plugins = Plugins()
|
||||
self.plugins._import_plugins_to_argparse(file_path, subparsers)
|
||||
try:
|
||||
core_path = os.path.dirname(os.path.realpath(__file__)) + "/core_plugins"
|
||||
self.plugins._import_plugins_to_argparse(core_path, subparsers)
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
file_path = self.config.defaultdir + "/plugins"
|
||||
self.plugins._import_plugins_to_argparse(file_path, subparsers)
|
||||
except:
|
||||
pass
|
||||
for preload in self.plugins.preloads.values():
|
||||
preload.Preload(self)
|
||||
#Generate helps
|
||||
nodeparser.usage = self._help("usage", subparsers)
|
||||
nodeparser.epilog = self._help("end", subparsers)
|
||||
|
422
connpy/core_plugins/sync.py
Executable file
422
connpy/core_plugins/sync.py
Executable file
@ -0,0 +1,422 @@
|
||||
#!/usr/bin/python3
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import zipfile
|
||||
import tempfile
|
||||
import io
|
||||
import yaml
|
||||
from google.oauth2.credentials import Credentials
|
||||
from google.auth.transport.requests import Request
|
||||
from googleapiclient.discovery import build
|
||||
from google.auth.exceptions import RefreshError
|
||||
from google_auth_oauthlib.flow import InstalledAppFlow
|
||||
from googleapiclient.http import MediaFileUpload,MediaIoBaseDownload
|
||||
from googleapiclient.errors import HttpError
|
||||
from datetime import datetime
|
||||
|
||||
class sync:
|
||||
|
||||
def __init__(self, connapp):
|
||||
self.scopes = ['https://www.googleapis.com/auth/drive.appdata']
|
||||
self.token_file = f"{connapp.config.defaultdir}/gtoken.json"
|
||||
self.file = connapp.config.file
|
||||
self.key = connapp.config.key
|
||||
self.google_client = f"{os.path.dirname(os.path.abspath(__file__))}/sync_client"
|
||||
try:
|
||||
self.sync = connapp.config.config["sync"]
|
||||
except:
|
||||
self.sync = False
|
||||
|
||||
def login(self):
|
||||
creds = None
|
||||
# The file token.json stores the user's access and refresh tokens.
|
||||
if os.path.exists(self.token_file):
|
||||
creds = Credentials.from_authorized_user_file(self.token_file, self.scopes)
|
||||
|
||||
# If there are no valid credentials available, let the user log in.
|
||||
if not creds or not creds.valid:
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
creds.refresh(Request())
|
||||
else:
|
||||
flow = InstalledAppFlow.from_client_secrets_file(
|
||||
self.google_client, self.scopes)
|
||||
creds = flow.run_local_server(port=0, access_type='offline')
|
||||
|
||||
# Save the credentials for the next run
|
||||
with open(self.token_file, 'w') as token:
|
||||
token.write(creds.to_json())
|
||||
|
||||
print("Logged in successfully.")
|
||||
|
||||
def logout(self):
|
||||
if os.path.exists(self.token_file):
|
||||
os.remove(self.token_file)
|
||||
print("Logged out successfully.")
|
||||
else:
|
||||
print("No credentials file found. Already logged out.")
|
||||
|
||||
def get_credentials(self):
|
||||
# Load credentials from token.json
|
||||
if os.path.exists(self.token_file):
|
||||
creds = Credentials.from_authorized_user_file(self.token_file, self.scopes)
|
||||
else:
|
||||
print("Credentials file not found.")
|
||||
return 0
|
||||
|
||||
# If there are no valid credentials available, ask the user to log in again
|
||||
if not creds or not creds.valid:
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
try:
|
||||
creds.refresh(Request())
|
||||
except RefreshError:
|
||||
print("Could not refresh access token. Please log in again.")
|
||||
return 0
|
||||
else:
|
||||
print("Credentials are missing or invalid. Please log in.")
|
||||
return 0
|
||||
return creds
|
||||
|
||||
def check_login_status(self):
|
||||
# Check if the credentials file exists
|
||||
if os.path.exists(self.token_file):
|
||||
# Load credentials from token.json
|
||||
creds = Credentials.from_authorized_user_file(self.token_file)
|
||||
|
||||
# If credentials are expired, refresh them
|
||||
if creds and creds.expired and creds.refresh_token:
|
||||
try:
|
||||
creds.refresh(Request())
|
||||
except RefreshError:
|
||||
pass
|
||||
|
||||
# Check if the credentials are valid after refresh
|
||||
if creds.valid:
|
||||
return True
|
||||
else:
|
||||
return "Invalid"
|
||||
else:
|
||||
return False
|
||||
|
||||
def status(self):
|
||||
print(f"Login: {self.check_login_status()}")
|
||||
print(f"Sync: {self.sync}")
|
||||
|
||||
|
||||
def get_appdata_files(self):
|
||||
|
||||
creds = self.get_credentials()
|
||||
if not creds:
|
||||
return 0
|
||||
|
||||
try:
|
||||
# Create the Google Drive service
|
||||
service = build("drive", "v3", credentials=creds)
|
||||
|
||||
# List files in the appDataFolder
|
||||
response = (
|
||||
service.files()
|
||||
.list(
|
||||
spaces="appDataFolder",
|
||||
fields="files(id, name, appProperties)",
|
||||
pageSize=10,
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
|
||||
files_info = []
|
||||
for file in response.get("files", []):
|
||||
# Extract file information
|
||||
file_id = file.get("id")
|
||||
file_name = file.get("name")
|
||||
timestamp = file.get("appProperties", {}).get("timestamp")
|
||||
human_readable_date = file.get("appProperties", {}).get("date")
|
||||
files_info.append({"name": file_name, "id": file_id, "date": human_readable_date, "timestamp": timestamp})
|
||||
|
||||
return files_info
|
||||
|
||||
except HttpError as error:
|
||||
print(f"An error occurred: {error}")
|
||||
return 0
|
||||
|
||||
|
||||
def dump_appdata_files_yaml(self):
|
||||
files_info = self.get_appdata_files()
|
||||
if not files_info:
|
||||
print("Failed to retrieve files or no files found.")
|
||||
return
|
||||
# Pretty print as YAML
|
||||
yaml_output = yaml.dump(files_info, sort_keys=False, default_flow_style=False)
|
||||
print(yaml_output)
|
||||
|
||||
|
||||
def backup_file_to_drive(self, file_path, timestamp):
|
||||
|
||||
creds = self.get_credentials()
|
||||
if not creds:
|
||||
return 1
|
||||
|
||||
# Create the Google Drive service
|
||||
service = build('drive', 'v3', credentials=creds)
|
||||
|
||||
# Convert timestamp to a human-readable date
|
||||
human_readable_date = datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# Upload the file to Google Drive with timestamp metadata
|
||||
file_metadata = {
|
||||
'name': os.path.basename(file_path),
|
||||
'parents': ["appDataFolder"],
|
||||
'appProperties': {
|
||||
'timestamp': str(timestamp),
|
||||
'date': human_readable_date # Add human-readable date attribute
|
||||
}
|
||||
}
|
||||
media = MediaFileUpload(file_path)
|
||||
|
||||
try:
|
||||
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
|
||||
return 0
|
||||
except Exception as e:
|
||||
return f"An error occurred: {e}"
|
||||
|
||||
def delete_file_by_id(self, file_id):
|
||||
creds = self.get_credentials()
|
||||
if not creds:
|
||||
return 1
|
||||
|
||||
try:
|
||||
# Create the Google Drive service
|
||||
service = build("drive", "v3", credentials=creds)
|
||||
|
||||
# Delete the file
|
||||
service.files().delete(fileId=file_id).execute()
|
||||
return 0
|
||||
except Exception as e:
|
||||
return f"An error occurred: {e}"
|
||||
|
||||
def compress_specific_files(self, zip_path):
|
||||
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||||
zipf.write(self.file, "config.json")
|
||||
zipf.write(self.key, ".osk")
|
||||
|
||||
def compress_and_upload(self):
|
||||
# Read the file content to get the folder path
|
||||
timestamp = int(time.time() * 1000)
|
||||
# Create a temporary directory for storing the zip file
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
# Compress specific files from the folder path to a zip file in the temporary directory
|
||||
zip_path = os.path.join(tmp_dir, f"connpy-backup-{timestamp}.zip")
|
||||
self.compress_specific_files(zip_path)
|
||||
|
||||
# Get the files in the app data folder
|
||||
app_data_files = self.get_appdata_files()
|
||||
if app_data_files == 0:
|
||||
return 1
|
||||
|
||||
# If there are 10 or more files, remove the oldest one based on timestamp
|
||||
if len(app_data_files) >= 10:
|
||||
oldest_file = min(app_data_files, key=lambda x: x['timestamp'])
|
||||
delete_old = self.delete_file_by_id(oldest_file['id'])
|
||||
if delete_old:
|
||||
print(delete_old)
|
||||
return 1
|
||||
|
||||
# Upload the new file
|
||||
upload_new = self.backup_file_to_drive(zip_path, timestamp)
|
||||
if upload_new:
|
||||
print(upload_new)
|
||||
return 1
|
||||
|
||||
print("Backup to google uploaded successfully.")
|
||||
return 0
|
||||
|
||||
def decompress_zip(self, zip_path):
|
||||
try:
|
||||
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
||||
# Extract the specific file to the specified destination
|
||||
zipf.extract("config.json", os.path.dirname(self.file))
|
||||
zipf.extract(".osk", os.path.dirname(self.key))
|
||||
return 0
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
return 1
|
||||
|
||||
def download_file_by_id(self, file_id, destination_path):
|
||||
|
||||
creds = self.get_credentials()
|
||||
if not creds:
|
||||
return 1
|
||||
|
||||
try:
|
||||
# Create the Google Drive service
|
||||
service = build('drive', 'v3', credentials=creds)
|
||||
|
||||
# Download the file
|
||||
request = service.files().get_media(fileId=file_id)
|
||||
fh = io.FileIO(destination_path, mode='wb')
|
||||
downloader = MediaIoBaseDownload(fh, request)
|
||||
done = False
|
||||
while done is False:
|
||||
status, done = downloader.next_chunk()
|
||||
|
||||
return 0
|
||||
except Exception as e:
|
||||
return f"An error occurred: {e}"
|
||||
|
||||
def restore_last_config(self, file_id=None):
|
||||
# Get the files in the app data folder
|
||||
app_data_files = self.get_appdata_files()
|
||||
if not app_data_files:
|
||||
print("No files found in app data folder.")
|
||||
return 1
|
||||
|
||||
# Check if a specific file_id was provided and if it exists in the list
|
||||
if file_id:
|
||||
selected_file = next((f for f in app_data_files if f['id'] == file_id), None)
|
||||
if not selected_file:
|
||||
print(f"No file found with ID: {file_id}")
|
||||
return 1
|
||||
else:
|
||||
# Find the latest file based on timestamp
|
||||
selected_file = max(app_data_files, key=lambda x: x['timestamp'])
|
||||
|
||||
# Download the selected file to a temporary location
|
||||
temp_download_path = os.path.join(tempfile.gettempdir(), 'connpy-backup.zip')
|
||||
if self.download_file_by_id(selected_file['id'], temp_download_path):
|
||||
return 1
|
||||
|
||||
# Unzip the downloaded file to the destination folder
|
||||
if self.decompress_zip(temp_download_path):
|
||||
print("Failed to decompress the file.")
|
||||
return 1
|
||||
|
||||
print(f"Backup from Google Drive restored successfully: {selected_file['name']}")
|
||||
return 0
|
||||
|
||||
# @staticmethod
|
||||
def config_listener_post(self, file, result):
|
||||
if self.sync:
|
||||
if self.check_login_status() == True:
|
||||
if not result:
|
||||
self.compress_and_upload()
|
||||
return result
|
||||
|
||||
class Preload:
|
||||
def __init__(self, connapp):
|
||||
syncapp = sync(connapp)
|
||||
connapp.config._saveconfig.register_post_hook(syncapp.config_listener_post)
|
||||
|
||||
class Parser:
|
||||
def __init__(self):
|
||||
self.parser = argparse.ArgumentParser(description="Sync config with Google")
|
||||
self.description = "Sync config with Google"
|
||||
subparsers = self.parser.add_subparsers(title="Commands", dest='command',metavar="")
|
||||
login_parser = subparsers.add_parser("login", help="Login to Google to enable synchronization")
|
||||
logout_parser = subparsers.add_parser("logout", help="Logout from Google")
|
||||
start_parser = subparsers.add_parser("start", help="Start synchronizing with Google")
|
||||
stop_parser = subparsers.add_parser("stop", help="Stop any ongoing synchronization")
|
||||
restore_parser = subparsers.add_parser("restore", help="Restore data from Google")
|
||||
backup_parser = subparsers.add_parser("once", help="Backup current configuration to Google once")
|
||||
restore_parser.add_argument("--id", type=str, help="Optional file ID to restore a specific backup", required=False)
|
||||
status_parser = subparsers.add_parser("status", help="Check the current status of synchronization")
|
||||
list_parser = subparsers.add_parser("list", help="List all backups stored on Google")
|
||||
|
||||
class Entrypoint:
|
||||
def __init__(self, args, parser, connapp):
|
||||
syncapp = sync(connapp)
|
||||
# print(args)
|
||||
# print(syncapp.__dict__)
|
||||
if args.command == 'login':
|
||||
syncapp.login()
|
||||
elif args.command == "status":
|
||||
syncapp.status()
|
||||
elif args.command == "start":
|
||||
connapp._change_settings("sync", True)
|
||||
elif args.command == "stop":
|
||||
connapp._change_settings("sync", False)
|
||||
elif args.command == "list":
|
||||
syncapp.dump_appdata_files_yaml()
|
||||
elif args.command == "once":
|
||||
syncapp.compress_and_upload()
|
||||
elif args.command == "restore":
|
||||
syncapp.restore_last_config(args.id)
|
||||
elif args.command == "logout":
|
||||
syncapp.logout()
|
||||
# if args.command == 'netmask':
|
||||
# if args.file:
|
||||
# for line in args.file:
|
||||
# line = line.strip()
|
||||
# if line:
|
||||
# print(NetmaskTools.process_input(args.conversion, line.strip()))
|
||||
# else:
|
||||
# input_str = ' '.join(args.input)
|
||||
# print(NetmaskTools.process_input(args.conversion, input_str))
|
||||
# elif args.command == 'summarize':
|
||||
# with args.file as file:
|
||||
# subnets = [line.strip() for line in file if line.strip()]
|
||||
# summarized = Sumarize.summarize_subnets(subnets, args.mode)
|
||||
# if isinstance(summarized, list):
|
||||
# for subnet in summarized:
|
||||
# print(subnet)
|
||||
# else:
|
||||
# print(summarized)
|
||||
# elif args.command == 'password':
|
||||
# if connapp:
|
||||
# for passwd in Password.get_passwords(args, connapp):
|
||||
# print(passwd)
|
||||
# elif args.command == 'connect':
|
||||
# Connect.connect_command(args)
|
||||
# else:
|
||||
# parser.print_help()
|
||||
|
||||
|
||||
def _connpy_completion(wordsnumber, words, info = None):
|
||||
if wordsnumber == 3:
|
||||
result = ["--help", "netmask", "summarize", "password", "connect"]
|
||||
#NETMASK_completion
|
||||
if wordsnumber == 4 and words[1] == "netmask":
|
||||
result = ['cidr_to_netmask', 'cidr_to_wildcard',
|
||||
'netmask_to_cidr', 'wildcard_to_cidr',
|
||||
'netmask_to_wildcard', 'wildcard_to_netmask', 'cidr_to_range', "--file", "--help"]
|
||||
elif wordsnumber == 6 and words[1] == "netmask" and words[2] in ["-f", "--file"]:
|
||||
result = ['cidr_to_netmask', 'cidr_to_wildcard',
|
||||
'netmask_to_cidr', 'wildcard_to_cidr',
|
||||
'netmask_to_wildcard', 'wildcard_to_netmask']
|
||||
elif wordsnumber == 5 and words[1] == "netmask" and words[2] in ["-f", "--file"]:
|
||||
result = _getcwd(words, words[2])
|
||||
elif wordsnumber == 6 and words[1] == "netmask" and words[3] in ["-f", "--file"]:
|
||||
result = _getcwd(words, words[2])
|
||||
#SUMMARIZE_completion
|
||||
elif wordsnumber == 4 and words[1] == "summarize":
|
||||
result = _getcwd(words, words[1])
|
||||
result.extend(["--mode", "--help"])
|
||||
elif wordsnumber == 5 and words[1] == "summarize":
|
||||
if words[2] == "--mode":
|
||||
result = ["strict", "inclusive"]
|
||||
else:
|
||||
result = ["--mode"]
|
||||
elif wordsnumber == 6 and words[1] == "summarize":
|
||||
if words[3] == "--mode":
|
||||
result = ["strict", "inclusive"]
|
||||
elif words[3] in ["strict", "inclusive"]:
|
||||
result = _getcwd(words, words[3])
|
||||
#PASSWORD_completion
|
||||
elif wordsnumber == 4 and words[1] == "password":
|
||||
result = info["nodes"]
|
||||
result.extend(info["profiles"])
|
||||
result.extend(["--help", "--profile"])
|
||||
elif wordsnumber == 5 and words[1] == "password":
|
||||
if words[2] == "--profile":
|
||||
result = info["profiles"]
|
||||
else:
|
||||
result = ["--profile"]
|
||||
#CONNECT_completion
|
||||
elif wordsnumber == 4 and words[1] == "connect":
|
||||
result = ["start", "stop", "restart", "--help"]
|
||||
|
||||
return result
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = Parser()
|
||||
args = parser.parser.parse_args()
|
||||
Entrypoint(args, parser.parser, None)
|
48
connpy/hooks.py
Executable file
48
connpy/hooks.py
Executable file
@ -0,0 +1,48 @@
|
||||
#!/usr/bin/env python3
|
||||
#Imports
|
||||
from functools import wraps
|
||||
|
||||
#functions and classes
|
||||
|
||||
class ConfigHook:
|
||||
"""Decorator class to enable Config save hooking"""
|
||||
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
self.pre_hooks = [] # List to store registered pre-hooks
|
||||
self.post_hooks = [] # List to store registered post-hooks
|
||||
wraps(func)(self)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
# Execute pre-hooks before the original function
|
||||
for hook in self.pre_hooks:
|
||||
try:
|
||||
args, kwargs = hook(*args, **kwargs)
|
||||
except Exception as e:
|
||||
print(f"ConfigHook Pre-hook raised an exception: {e}")
|
||||
|
||||
try:
|
||||
# Execute original function
|
||||
result = self.func(self.instance, *args, **kwargs)
|
||||
|
||||
finally:
|
||||
# Execute post-hooks after the original function
|
||||
for hook in self.post_hooks:
|
||||
try:
|
||||
result = hook(*args, **kwargs, result=result) # Pass result to hooks
|
||||
except Exception as e:
|
||||
print(f"ConfigHook Post-hook raised an exception: {e}")
|
||||
|
||||
return result
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
self.instance = instance
|
||||
return self
|
||||
|
||||
def register_pre_hook(self, hook):
|
||||
"""Register a function to be called before the original function"""
|
||||
self.pre_hooks.append(hook)
|
||||
|
||||
def register_post_hook(self, hook):
|
||||
"""Register a function to be called after the original function"""
|
||||
self.post_hooks.append(hook)
|
@ -9,6 +9,7 @@ class Plugins:
|
||||
def __init__(self):
|
||||
self.plugins = {}
|
||||
self.plugin_parsers = {}
|
||||
self.preloads = {}
|
||||
|
||||
def verify_script(self, file_path):
|
||||
"""
|
||||
@ -28,7 +29,7 @@ class Plugins:
|
||||
|
||||
### Verifications:
|
||||
- The presence of only allowed top-level elements.
|
||||
- The existence of two specific classes: 'Parser' and 'Entrypoint'.
|
||||
- The existence of two specific classes: 'Parser' and 'Entrypoint'. and/or specific class: Preload.
|
||||
- 'Parser' class must only have an '__init__' method and must assign 'self.parser'
|
||||
and 'self.description'.
|
||||
- 'Entrypoint' class must have an '__init__' method accepting specific arguments.
|
||||
@ -49,8 +50,10 @@ class Plugins:
|
||||
except SyntaxError as e:
|
||||
return f"Syntax error in file: {e}"
|
||||
|
||||
required_classes = {'Parser', 'Entrypoint'}
|
||||
found_classes = set()
|
||||
|
||||
has_parser = False
|
||||
has_entrypoint = False
|
||||
has_preload = False
|
||||
|
||||
for node in tree.body:
|
||||
# Allow only function definitions, class definitions, and pass statements at top-level
|
||||
@ -66,10 +69,10 @@ class Plugins:
|
||||
elif not isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.Import, ast.ImportFrom, ast.Pass)):
|
||||
return f"Plugin can only have pass, functions, classes and imports. {node} is not allowed" # Reject any other AST types
|
||||
|
||||
if isinstance(node, ast.ClassDef) and node.name in required_classes:
|
||||
found_classes.add(node.name)
|
||||
if isinstance(node, ast.ClassDef):
|
||||
|
||||
if node.name == 'Parser':
|
||||
has_parser = True
|
||||
# Ensure Parser class has only the __init__ method and assigns self.parser
|
||||
if not all(isinstance(method, ast.FunctionDef) and method.name == '__init__' for method in node.body):
|
||||
return "Parser class should only have __init__ method"
|
||||
@ -81,14 +84,27 @@ class Plugins:
|
||||
return "Parser class should set self.parser and self.description" # 'self.parser' or 'self.description' not assigned in __init__
|
||||
|
||||
elif node.name == 'Entrypoint':
|
||||
has_entrypoint = True
|
||||
init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
|
||||
if not init_method or len(init_method.args.args) != 4: # self, args, parser, conapp
|
||||
return "Entrypoint class should accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature
|
||||
return "Entrypoint class should have method __init__ and accept only arguments: args, parser and connapp" # 'Entrypoint' __init__ does not have correct signature
|
||||
|
||||
if required_classes == found_classes:
|
||||
return False
|
||||
else:
|
||||
return "Classes Entrypoint and Parser are mandatory"
|
||||
elif node.name == 'Preload':
|
||||
has_preload = True
|
||||
init_method = next((item for item in node.body if isinstance(item, ast.FunctionDef) and item.name == '__init__'), None)
|
||||
if not init_method or len(init_method.args.args) != 2: # self, connapp
|
||||
return "Preload class should have method __init__ and accept only argument: connapp" # 'Preload' __init__ does not have correct signature
|
||||
|
||||
# Applying the combination logic based on class presence
|
||||
if has_parser and not has_entrypoint:
|
||||
return "Parser requires Entrypoint class to be present."
|
||||
elif has_entrypoint and not has_parser:
|
||||
return "Entrypoint requires Parser class to be present."
|
||||
|
||||
if not (has_parser or has_entrypoint or has_preload):
|
||||
return "No valid class (Parser, Entrypoint, or Preload) found."
|
||||
|
||||
return False # All requirements met, no error
|
||||
|
||||
def _import_from_path(self, path):
|
||||
spec = importlib.util.spec_from_file_location("module.name", path)
|
||||
@ -108,9 +124,13 @@ class Plugins:
|
||||
filepath = os.path.join(directory, filename)
|
||||
check_file = self.verify_script(filepath)
|
||||
if check_file:
|
||||
print(f"Failed to load plugin: {filename}. Reason: {check_file}")
|
||||
continue
|
||||
else:
|
||||
self.plugins[root_filename] = self._import_from_path(filepath)
|
||||
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
|
||||
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
|
||||
if hasattr(self.plugins[root_filename], "Parser"):
|
||||
self.plugin_parsers[root_filename] = self.plugins[root_filename].Parser()
|
||||
subparsers.add_parser(root_filename, parents=[self.plugin_parsers[root_filename].parser], add_help=False, description=self.plugin_parsers[root_filename].description)
|
||||
if hasattr(self.plugins[root_filename], "Preload"):
|
||||
self.preloads[root_filename] = self.plugins[root_filename]
|
||||
|
||||
|
@ -1,10 +1,12 @@
|
||||
Flask>=2.3.2
|
||||
inquirer>=3.1.3
|
||||
openai>=0.27.6
|
||||
google_api_python_client>=2.125.0
|
||||
google_auth_oauthlib>=1.2.0
|
||||
inquirer>=3.2.4
|
||||
openai>=0.27.8
|
||||
pexpect>=4.8.0
|
||||
pycryptodome>=3.17
|
||||
protobuf>=5.26.1
|
||||
pycryptodome>=3.18.0
|
||||
pyfzf>=0.3.1
|
||||
PyYAML>=6.0
|
||||
setuptools>=67.8.0
|
||||
rich>=13.4.2
|
||||
PyYAML>=6.0.1
|
||||
rich>=13.7.1
|
||||
waitress>=2.1.2
|
||||
|
Loading…
Reference in New Issue
Block a user