Building an AI-Powered Crypto Agent: Hybrid RSA-AES, Signatures & Adaptive Risk Analysis
Overview
This guide shows how to build an AI-powered cryptographic agent system that combines classical cryptography with adaptive intelligence. Agents generate RSA keypairs, establish hybrid RSA/AES sessions, sign messages, analyze communication patterns for anomalies, and recommend key rotations when needed.
Core imports and event model
The implementation starts by importing required libraries and defining a simple SecurityEvent dataclass used to log and analyze system events.
import hashlib, hmac, json, time, secrets, numpy as np
from dataclasses import dataclass
from typing import Dict, List
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend()
@dataclass
class SecurityEvent:
timestamp: float
event_type: str
risk_score: float
details: Dict
Agent class: keys, sessions and event tracking
Agents are implemented as instances of CryptoAgent. Each agent generates an RSA keypair, stores session keys in-memory, tracks encryption counts, and logs security events for later analysis.
class CryptoAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())
self.public_key = self.private_key.public_key()
self.session_keys = {}
self.security_events = []
self.encryption_count = 0
self.key_rotation_threshold = 100
def get_public_key_bytes(self) -> bytes:
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def establish_session(self, partner_id: str, partner_public_key_bytes: bytes) -> bytes:
session_key = secrets.token_bytes(32)
self.session_keys[partner_id] = session_key
partner_public_key = serialization.load_pem_public_key(partner_public_key_bytes, backend=default_backend())
encrypted_session_key = partner_public_key.encrypt(
session_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
self.log_security_event("SESSION_ESTABLISHED", 0.1, {"partner": partner_id})
return encrypted_session_key
def receive_session_key(self, partner_id: str, encrypted_session_key: bytes):
session_key = self.private_key.decrypt(
encrypted_session_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None)
)
self.session_keys[partner_id] = session_key
Hybrid encryption: AES-GCM payloads + RSA-encrypted session keys
Messages are encrypted with AES-GCM using a per-session symmetric key. The session key itself is exchanged encrypted with the recipient’s RSA public key (OAEP). Each encrypted payload is also signed and assigned a risk score by the agent’s analysis routines.
def encrypt_message(self, partner_id: str, plaintext: str) -> Dict:
if partner_id not in self.session_keys:
raise ValueError(f"No session established with {partner_id}")
self.encryption_count += 1
if self.encryption_count >= self.key_rotation_threshold:
self.log_security_event("KEY_ROTATION_NEEDED", 0.3, {"count": self.encryption_count})
iv = secrets.token_bytes(12)
cipher = Cipher(algorithms.AES(self.session_keys[partner_id]), modes.GCM(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
message_data = iv + ciphertext + encryptor.tag
signature = self.sign_data(message_data)
risk_score = self.analyze_encryption_pattern(len(plaintext))
return {
"sender": self.agent_id,
"recipient": partner_id,
"iv": iv.hex(),
"ciphertext": ciphertext.hex(),
"tag": encryptor.tag.hex(),
"signature": signature.hex(),
"timestamp": time.time(),
"risk_score": risk_score
}
def decrypt_message(self, encrypted_msg: Dict) -> str:
sender_id = encrypted_msg["sender"]
if sender_id not in self.session_keys:
raise ValueError(f"No session established with {sender_id}")
iv = bytes.fromhex(encrypted_msg["iv"])
ciphertext = bytes.fromhex(encrypted_msg["ciphertext"])
tag = bytes.fromhex(encrypted_msg["tag"])
cipher = Cipher(algorithms.AES(self.session_keys[sender_id]), modes.GCM(iv, tag), backend=default_backend())
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
if encrypted_msg.get("risk_score", 0) > 0.7:
self.log_security_event("HIGH_RISK_MESSAGE", 0.8, {"sender": sender_id})
return plaintext.decode()
Signatures, analysis and reporting
The agent signs message payloads and runs heuristic analysis to compute a risk score based on message length, recent events and encryption frequency. All events are logged and summarized in a security report.
def sign_data(self, data: bytes) -> bytes:
return self.private_key.sign(
data,
padg.PSS(mgf=padg.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
def analyze_encryption_pattern(self, message_length: int) -> float:
recent_events = self.security_events[-10:] if len(self.security_events) >= 10 else self.security_events
avg_risk = np.mean([e.risk_score for e in recent_events]) if recent_events else 0.1
risk_score = 0.1
if message_length > 10000:
risk_score += 0.3
if self.encryption_count % 50 == 0 and self.encryption_count > 0:
risk_score += 0.2
risk_score = (risk_score + avg_risk) / 2
self.log_security_event("ENCRYPTION_ANALYSIS", risk_score, {"msg_len": message_length})
return min(risk_score, 1.0)
def log_security_event(self, event_type: str, risk_score: float, details: Dict):
event = SecurityEvent(timestamp=time.time(), event_type=event_type, risk_score=risk_score, details=details)
self.security_events.append(event)
def generate_security_report(self) -> Dict:
if not self.security_events:
return {"status": "No events recorded"}
total_events = len(self.security_events)
high_risk_events = [e for e in self.security_events if e.risk_score > 0.7]
avg_risk = np.mean([e.risk_score for e in self.security_events])
event_types = {}
for event in self.security_events:
event_types[event.event_type] = event_types.get(event.event_type, 0) + 1
return {
"agent_id": self.agent_id,
"total_events": total_events,
"high_risk_events": len(high_risk_events),
"average_risk_score": round(avg_risk, 3),
"encryption_count": self.encryption_count,
"key_rotation_needed": self.encryption_count >= self.key_rotation_threshold,
"event_breakdown": event_types,
"security_status": "CRITICAL" if avg_risk > 0.7 else "WARNING" if avg_risk > 0.4 else "NORMAL"
}
Demo flow: creating agents, exchanging sessions, sending messages
A demo function creates two agents (Alice and Bob), performs hybrid key exchange, sends several messages, and prints decrypted results along with risk scores and security reports.
def demo_crypto_agent_system():
print(" Advanced Cryptographic Agent System Demo\n")
print("=" * 60)
alice = CryptoAgent("Alice")
bob = CryptoAgent("Bob")
print("\n1. Agents Created")
print(f" Alice ID: {alice.agent_id}")
print(f" Bob ID: {bob.agent_id}")
print("\n2. Establishing Secure Session (Hybrid Encryption)")
alice_public_key = alice.get_public_key_bytes()
bob_public_key = bob.get_public_key_bytes()
encrypted_session_key = alice.establish_session("Bob", bob_public_key)
bob.receive_session_key("Alice", encrypted_session_key)
print(f" ✓ Session established with {len(encrypted_session_key)} byte encrypted key")
print("\n3. Encrypting and Transmitting Messages")
messages = [
"Hello Bob! This is a secure message.",
"The launch codes are: Alpha-7-Charlie-9",
"Meeting at 3 PM tomorrow.",
"This is a very long message " * 100
]
for i, msg in enumerate(messages, 1):
encrypted = alice.encrypt_message("Bob", msg)
print(f"\n Message {i}:")
print(f" - Plaintext length: {len(msg)} chars")
print(f" - Ciphertext: {encrypted['ciphertext'][:60]}...")
print(f" - Risk Score: {encrypted['risk_score']:.3f}")
print(f" - Signature: {encrypted['signature'][:40]}...")
decrypted = bob.decrypt_message(encrypted)
print(f" - Decrypted: {decrypted[:60]}{'...' if len(decrypted) > 60 else ''}")
print(f" - Verification: {'✓ SUCCESS' if decrypted == msg else '✗ FAILED'}")
print("\n4. AI-Powered Security Analysis")
print("\n Alice's Security Report:")
alice_report = alice.generate_security_report()
for k, v in alice_report.items(): print(f" - {k}: {v}")
print("\n Bob's Security Report:")
bob_report = bob.generate_security_report()
for k, v in bob_report.items(): print(f" - {k}: {v}")
print("\n" + "=" * 60)
print("Demo Complete! Key Features Demonstrated:")
print("✓ Hybrid encryption (RSA + AES-GCM)")
print("✓ Digital signatures for authentication")
print("✓ AI-powered anomaly detection")
print("✓ Intelligent key rotation recommendations")
print("✓ Real-time security monitoring")
if __name__ == "__main__":
demo_crypto_agent_system()
Practical notes
- The example combines RSA for secure session key exchange and AES-GCM for efficient authenticated encryption.
- The AI/heuristic components are lightweight: they compute risk scores from message length, encryption frequency and recent events. In production, more sophisticated ML models and telemetry would improve detection.
- Keep key rotation thresholds and risk thresholds tuned for your environment.
This compact system demonstrates how adaptive intelligence can be layered on top of proven cryptographic primitives to enable self-monitoring, anomaly detection, and automated security recommendations.