Building an AI-Powered Crypto Agent: Hybrid RSA-AES, Signatures & Adaptive Risk Analysis

Overview

This guide shows how to build an AI-powered cryptographic agent system that combines classical cryptography with adaptive intelligence. Agents generate RSA keypairs, establish hybrid RSA/AES sessions, sign messages, analyze communication patterns for anomalies, and recommend key rotations when needed.

Core imports and event model

The implementation starts by importing required libraries and defining a simple SecurityEvent dataclass used to log and analyze system events.

import hashlib, hmac, json, time, secrets, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend()


@dataclass
class SecurityEvent:
   timestamp: float
   event_type: str
   risk_score: float
   details: Dict

Agent class: keys, sessions and event tracking

Agents are implemented as instances of CryptoAgent. Each agent generates an RSA keypair, stores session keys in-memory, tracks encryption counts, and logs security events for later analysis.

class CryptoAgent:
   def __init__(self, agent_id: str):
       self.agent_id = agent_id
       self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
       self.public_key = self.private_key.public_key()
       self.session_keys = {}
       self.security_events = []
       self.encryption_count = 0
       self.key_rotation_threshold = 100


   def get_public_key_bytes(self) -> bytes:
       return self.public_key.public_bytes(
           encoding=serialization.Encoding.PEM,
           format=serialization.PublicFormat.SubjectPublicKeyInfo
       )


   def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
       session_key = secrets.token_bytes(32)
       self.session_keys[partner_id] = session_key
       partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
       encrypted_session_key = partner_public_key.encrypt(
           session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.log_security_event("SESSION_ESTABLISHED", 0.1, {"partner": partner_id})
       return encrypted_session_key


   def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
       session_key = self.private_key.decrypt(
           encrypted_session_key,
           padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
       )
       self.session_keys[partner_id] = session_key

Hybrid encryption: AES-GCM payloads + RSA-encrypted session keys

Messages are encrypted with AES-GCM using a per-session symmetric key. The session key itself is exchanged encrypted with the recipient’s RSA public key (OAEP). Each encrypted payload is also signed and assigned a risk score by the agent’s analysis routines.

 def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
       if partner_id not in self.session_keys:
           raise ValueError(f"No session established with {partner_id}")
       self.encryption_count += 1
       if self.encryption_count >= self.key_rotation_threshold:
           self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"count": self.encryption_count})
       iv = secrets.token_bytes(12)
       cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
       encryptor = cipher.encryptor()
       ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
       message_data = iv + ciphertext + encryptor.tag
       signature = self.sign_data(message_data)
       risk_score = self.analyze_encryption_pattern(len(plaintext))
       return {
           "sender": self.agent_id,
           "recipient": partner_id,
           "iv": iv.hex(),
           "ciphertext": ciphertext.hex(),
           "tag": encryptor.tag.hex(),
           "signature": signature.hex(),
           "timestamp": time.time(),
           "risk_score": risk_score
       }


   def decrypt_message(self, encrypted_msg: Dict) -> str:
       sender_id = encrypted_msg["sender"]
       if sender_id not in self.session_keys:
           raise ValueError(f"No session established with {sender_id}")
       iv = bytes.fromhex(encrypted_msg["iv"])
       ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
       tag = bytes.fromhex(encrypted_msg["tag"])
       cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
       decryptor = cipher.decryptor()
       plaintext = decryptor.update(ciphertext) + decryptor.finalize()
       if encrypted_msg.get("risk_score", 0) > 0.7:
           self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
       return plaintext.decode()

Signatures, analysis and reporting

The agent signs message payloads and runs heuristic analysis to compute a risk score based on message length, recent events and encryption frequency. All events are logged and summarized in a security report.

def sign_data(self, data: bytes) -> bytes:
       return self.private_key.sign(
           data,
           padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
           hashes.SHA256()
       )


   def analyze_encryption_pattern(self, message_length: int) -> float:
       recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
       avg_risk = np.mean([e.risk_score for e in recent_events]) if recent_events else 0.1
       risk_score = 0.1
       if message_length > 10000:
           risk_score += 0.3
       if self.encryption_count % 50 == 0 and self.encryption_count > 0:
           risk_score += 0.2
       risk_score = (risk_score + avg_risk) / 2
       self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
       return min(risk_score, 1.0)


   def log_security_event(self, event_type: str, risk_score: float, details: Dict):
       event = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, details=details)
       self.security_events.append(event)


   def generate_security_report(self) -> Dict:
       if not self.security_events:
           return {"status": "No events recorded"}
       total_events = len(self.security_events)
       high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
       avg_risk = np.mean([e.risk_score for e in self.security_events])
       event_types = {}
       for event in self.security_events:
           event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
       return {
           "agent_id": self.agent_id,
           "total_events": total_events,
           "high_risk_events": len(high_risk_events),
           "average_risk_score": round(avg_risk, 3),
           "encryption_count": self.encryption_count,
           "key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
           "event_breakdown": event_types,
           "security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
       }

Demo flow: creating agents, exchanging sessions, sending messages

A demo function creates two agents (Alice and Bob), performs hybrid key exchange, sends several messages, and prints decrypted results along with risk scores and security reports.

def demo_crypto_agent_system():
   print(" Advanced Cryptographic Agent System Demo\n")
   print("=" * 60)
   alice = CryptoAgent("Alice")
   bob = CryptoAgent("Bob")
   print("\n1. Agents Created")
   print(f"   Alice ID: {alice.agent_id}")
   print(f"   Bob ID: {bob.agent_id}")
   print("\n2. Establishing Secure Session (Hybrid Encryption)")
   alice_public_key = alice.get_public_key_bytes()
   bob_public_key = bob.get_public_key_bytes()
   encrypted_session_key = alice.establish_session("Bob", bob_public_key)
   bob.receive_session_key("Alice", encrypted_session_key)
   print(f"   ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
   print("\n3. Encrypting and Transmitting Messages")
   messages = [
       "Hello Bob! This is a secure message.",
       "The launch codes are: Alpha-7-Charlie-9",
       "Meeting at 3 PM tomorrow.",
       "This is a very long message " * 100
   ]
   for i, msg in enumerate(messages, 1):
       encrypted = alice.encrypt_message("Bob", msg)
       print(f"\n   Message {i}:")
       print(f"   - Plaintext length: {len(msg)} chars")
       print(f"   - Ciphertext: {encrypted['ciphertext'][:60]}...")
       print(f"   - Risk Score: {encrypted['risk_score']:.3f}")
       print(f"   - Signature: {encrypted['signature'][:40]}...")
       decrypted = bob.decrypt_message(encrypted)
       print(f"   - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
       print(f"   - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
   print("\n4. AI-Powered Security Analysis")
   print("\n   Alice's Security Report:")
   alice_report = alice.generate_security_report()
   for k, v in alice_report.items(): print(f"   - {k}: {v}")
   print("\n   Bob's Security Report:")
   bob_report = bob.generate_security_report()
   for k, v in bob_report.items(): print(f"   - {k}: {v}")
   print("\n" + "=" * 60)
   print("Demo Complete! Key Features Demonstrated:")
   print("✓ Hybrid encryption (RSA + AES-GCM)")
   print("✓ Digital signatures for authentication")
   print("✓ AI-powered anomaly detection")
   print("✓ Intelligent key rotation recommendations")
   print("✓ Real-time security monitoring")


if __name__ == "__main__":
   demo_crypto_agent_system()

Practical notes

This compact system demonstrates how adaptive intelligence can be layered on top of proven cryptographic primitives to enable self-monitoring, anomaly detection, and automated security recommendations.