Создание AI-криптоагента: гибрид RSA-AES, подписи и адаптивный анализ рисков

Кратко о системе

Пошаговый пример показывает, как создать AI-ассистированную криптографическую систему агентов. Агенты генерируют RSA-ключи, устанавливают гибридные сессии RSA/AES, подписывают сообщения, анализируют паттерны коммуникаций на аномалии и предлагают ротацию ключей при необходимости.

Основные импорты и модель событий

В начале подключаются необходимые библиотеки и определяется dataclass SecurityEvent для логирования и анализа событий.

import hashlib, hmac, json, time, secrets, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend()


@dataclass
class SecurityEvent:
   timestamp: float
   event_type: str
   risk_score: float
   details: Dict

Класс агента: ключи, сессии и логирование

Каждый агент — это экземпляр CryptoAgent. Агент генерирует RSA-ключи, хранит сессионные ключи в памяти, отслеживает число операций шифрования и пишет события безопасности для последующего анализа.

class CryptoAgent:
   def __init__(self, agent_id: str):
       self.agent_id = agent_id
       self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
       self.public_key = self.private_key.public_key()
       self.session_keys = {}
       self.security_events = []
       self.encryption_count = 0
       self.key_rotation_threshold = 100


   def get_public_key_bytes(self) -> bytes:
       return self.public_key.public_bytes(
           encoding=serialization.Encoding.PEM,
           format=serialization.PublicFormat.SubjectPublicKeyInfo
       )


   def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
       session_key = secrets.token_bytes(32)
       self.session_keys[partner_id] = session_key
       partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
       encrypted_session_key = partner_public_key.encrypt(
           session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.log_security_event("SESSION_ESTABLISHED", 0.1, {"partner": partner_id})
       return encrypted_session_key


   def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
       session_key = self.private_key.decrypt(
           encrypted_session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.session_keys[partner_id] = session_key

Гибридное шифрование: AES-GCM + RSA для обмена ключом

Сообщения шифруются AES-GCM с использованием сессионного симметричного ключа. Сам сессионный ключ передаётся зашифрованным RSA (OAEP). Каждое сообщение подписывается и получает оценку риска от аналитики агента.

 def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
       if partner_id not in self.session_keys:
           raise ValueError(f"No session established with {partner_id}")
       self.encryption_count += 1
       if self.encryption_count >= self.key_rotation_threshold:
           self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"count": self.encryption_count})
       iv = secrets.token_bytes(12)
       cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
       encryptor = cipher.encryptor()
       ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
       message_data = iv + ciphertext + encryptor.tag
       signature = self.sign_data(message_data)
       risk_score = self.analyze_encryption_pattern(len(plaintext))
       return {
           "sender": self.agent_id,
           "recipient": partner_id,
           "iv": iv.hex(),
           "ciphertext": ciphertext.hex(),
           "tag": encryptor.tag.hex(),
           "signature": signature.hex(),
           "timestamp": time.time(),
           "risk_score": risk_score
       }


   def decrypt_message(self, encrypted_msg: Dict) -> str:
       sender_id = encrypted_msg["sender"]
       if sender_id not in self.session_keys:
           raise ValueError(f"No session established with {sender_id}")
       iv = bytes.fromhex(encrypted_msg["iv"])
       ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
       tag = bytes.fromhex(encrypted_msg["tag"])
       cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
       decryptor = cipher.decryptor()
       plaintext = decryptor.update(ciphertext) + decryptor.finalize()
       if encrypted_msg.get("risk_score", 0) > 0.7:
           self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
       return plaintext.decode()

Подписи, анализ и отчёты

Агент подписывает данные и вычисляет оценку риска на основе длины сообщения, частоты шифрования и недавних событий. Все события накапливаются и агрегируются в отчёт о безопасности.

def sign_data(self, data: bytes) -> bytes:
       return self.private_key.sign(
           data,
           padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
           hashes.SHA256()
       )


   def analyze_encryption_pattern(self, message_length: int) -> float:
       recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
       avg_risk = np.mean([e.risk_score for e in recent_events]) if recent_events else 0.1
       risk_score = 0.1
       if message_length > 10000:
           risk_score += 0.3
       if self.encryption_count % 50 == 0 and self.encryption_count > 0:
           risk_score += 0.2
       risk_score = (risk_score + avg_risk) / 2
       self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
       return min(risk_score, 1.0)


   def log_security_event(self, event_type: str, risk_score: float, details: Dict):
       event = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, details=details)
       self.security_events.append(event)


   def generate_security_report(self) -> Dict:
       if not self.security_events:
           return {"status": "No events recorded"}
       total_events = len(self.security_events)
       high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
       avg_risk = np.mean([e.risk_score for e in self.security_events])
       event_types = {}
       for event in self.security_events:
           event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
       return {
           "agent_id": self.agent_id,
           "total_events": total_events,
           "high_risk_events": len(high_risk_events),
           "average_risk_score": round(avg_risk, 3),
           "encryption_count": self.encryption_count,
           "key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
           "event_breakdown": event_types,
           "security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
       }

Демонстрация работы

Функция demo создаёт агентов Alice и Bob, устанавливает сессию, передаёт несколько сообщений и выводит расшифровку, оценки риска и сводные отчёты.

def demo_crypto_agent_system():
   print(" Advanced Cryptographic Agent System Demo\n")
   print("=" * 60)
   alice = CryptoAgent("Alice")
   bob = CryptoAgent("Bob")
   print("\n1. Agents Created")
   print(f"   Alice ID: {alice.agent_id}")
   print(f"   Bob ID: {bob.agent_id}")
   print("\n2. Establishing Secure Session (Hybrid Encryption)")
   alice_public_key = alice.get_public_key_bytes()
   bob_public_key = bob.get_public_key_bytes()
   encrypted_session_key = alice.establish_session("Bob", bob_public_key)
   bob.receive_session_key("Alice", encrypted_session_key)
   print(f"   ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
   print("\n3. Encrypting and Transmitting Messages")
   messages = [
       "Hello Bob! This is a secure message.",
       "The launch codes are: Alpha-7-Charlie-9",
       "Meeting at 3 PM tomorrow.",
       "This is a very long message " * 100
   ]
   for i, msg in enumerate(messages, 1):
       encrypted = alice.encrypt_message("Bob", msg)
       print(f"\n   Message {i}:")
       print(f"   - Plaintext length: {len(msg)} chars")
       print(f"   - Ciphertext: {encrypted['ciphertext'][:60]}...")
       print(f"   - Risk Score: {encrypted['risk_score']:.3f}")
       print(f"   - Signature: {encrypted['signature'][:40]}...")
       decrypted = bob.decrypt_message(encrypted)
       print(f"   - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
       print(f"   - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
   print("\n4. AI-Powered Security Analysis")
   print("\n   Alice's Security Report:")
   alice_report = alice.generate_security_report()
   for k, v in alice_report.items(): print(f"   - {k}: {v}")
   print("\n   Bob's Security Report:")
   bob_report = bob.generate_security_report()
   for k, v in bob_report.items(): print(f"   - {k}: {v}")
   print("\n" + "=" * 60)
   print("Demo Complete! Key Features Demonstrated:")
   print("✓ Hybrid encryption (RSA + AES-GCM)")
   print("✓ Digital signatures for authentication")
   print("✓ AI-powered anomaly detection")
   print("✓ Intelligent key rotation recommendations")
   print("✓ Real-time security monitoring")


if __name__ == "__main__":
   demo_crypto_agent_system()

Практические замечания