32.2 企业安全配置
约 193 字小于 1 分钟
32.2.1 身份验证与授权
API 密钥管理
集中式密钥管理
class APIKeyManager: """API 密钥管理器"""
def init(self): self.vault_url = os.getenv('VAULT_ADDR') self.vault_token = os.getenv('VAULT_TOKEN') self.key_cache = {} self.cache_ttl = 3600 # 1 hour
def get_key(self, key_name: str) -> str: """获取 API 密钥"""
检查缓存
if key_name in self.key_cache: cached = self.key_cache[key_name] if time.time() - cached['timestamp'] < self.cache_ttl: return cached['key']
从 Vault 获取
key = self._fetch_from_vault(key_name)
缓存密钥
self.key_cache[key_name] = { 'key': key, 'timestamp': time.time()
return key
def _fetch_from_vault(self, key_name: str) -> str: """从 Vault 获取密钥""" try: response = requests.get( f"{self.vault_url}/v1/secret/data/{key_name}", headers={'X-Vault-Token': self.vault_token} )
if response.status_code == 200: data = response.json() return data['data']['data']['value'] else: raise Exception(f"Failed to fetch key: {response.status_code}") except Exception as e: logger.error(f"Error fetching key from vault: {e}") raise