D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
imunify360
/
venv
/
lib64
/
python3.11
/
site-packages
/
im360
/
simple_rpc
/
Filename :
resident_socket.py
back
Copy
import socket import json from defence360agent.contracts.config import ( GENERIC_SENSOR_SOCKET_PATH, ) async def send_to_socket(msg, timeout=15, wait_for_response=True): with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) result = b"" data = True try: sock.connect(GENERIC_SENSOR_SOCKET_PATH) sock.sendall(json.dumps(msg).encode() + b"\n") if not wait_for_response: return {} while data: sock.settimeout(timeout) data = sock.recv(8192) result += data if data.find(b"\n") != -1: return json.loads(result.decode()) return json.loads(result.decode()) except ( ConnectionRefusedError, FileNotFoundError, ): return "Failed to send to socket, check your socket active" except socket.timeout: return ( result.decode() if result else "Failed to send to socket in time" ) except json.JSONDecodeError: return "Failed to decode json answer from a plugin"