HTTP示例--TCGA
以 Simple Somatic Mutation (SSM) 为例, 其它类型可见这里
class TCGAVariantData(BaseModel): cosmic_id: str | None = None tumor_types: list[str] = Field(default_factory=list) mutation_frequency: float | None = None mutation_count: int | None = None affected_cases: int | None = None consequence_type: str | None = None clinical_significance: str | None = None
示例
example = { "cosmic_id": "COSM476", "tumor_types": ["Skin Melanoma", "Colorectal Adenocarcinoma"], "mutation_frequency": 0.45, "mutation_count": 90, "affected_cases": 85, "consequence_type": "missense_variant", "clinical_significance": "Pathogenic",}
极其简化, 略过了 error 处理, cache 等重要细节
线程池
import httpximport sslimport asyncio
_pool_lock = asyncio.Lock()
async def get_connection_pool( verify: ssl.SSLContext | str | bool, timeout: httpx.Timeout,) -> httpx.AsyncClient: """Get or create a shared connection pool for the given SSL context.""" global _connection_pools
# Create a key for the pool based on verify setting if isinstance(verify, ssl.SSLContext): pool_key = f"ssl_{id(verify)}" else: pool_key = str(verify)
async with _pool_lock: pool = _connection_pools.get(pool_key) if pool is None or pool.is_closed: # Create a new connection pool with optimized settings pool = httpx.AsyncClient( verify=verify, http2=False, # HTTP/2 can add overhead for simple requests timeout=timeout, limits=httpx.Limits( max_keepalive_connections=20, # Reuse connections max_connections=100, # Total connection limit keepalive_expiry=30, # Keep connections alive for 30s ), # Enable connection pooling transport=httpx.AsyncHTTPTransport( retries=0, # We handle retries at a higher level ), ) _connection_pools[pool_key] = pool return pool
证书配置
import certifi
def get_ssl_context(tls_version: TLSVersion) -> SSLContext: """Create an SSLContext with the specified TLS version.""" context = SSLContext(PROTOCOL_TLS_CLIENT) context.minimum_version = tls_version context.maximum_version = tls_version context.load_verify_locations(cafile=certifi.where()) return context
核心: 执行 http 请求
import httpxasync def execute_http_request( method: str, url: str, params: dict, verify: ssl.SSLContext | str | bool, headers: dict[str, str] | None = None,) -> tuple[int, str]:
"""Execute the actual HTTP request using connection pooling. Args: method: HTTP method (GET or POST) url: Tsarget URL params: Request parameters verify: SSL verification settings headers: Optional custom headers Returns: Tuple of (status_code, response_text) Raises: ConnectionError: For connection failures TimeoutError: For timeout errors """
HTTP_TIMEOUT_SECONDS = 120.0
# Extract custom headers from params if present custom_headers = headers or {}
# Use the configured timeout from constants timeout = httpx.Timeout(HTTP_TIMEOUT_SECONDS)
# Use connection pooling with proper error handling use_pool = True
if use_pool: try: client = await get_connection_pool(verify, timeout) should_close = False except Exception: # Fallback to creating a new client client = httpx.AsyncClient( verify=verify, http2=False, timeout=timeout ) should_close = True else: # Create a new client for each request client = httpx.AsyncClient( verify=verify, http2=False, timeout=timeout ) should_close = True
try: # Make the request if method.upper() == "GET": resp = await client.get( url, params=params, headers=custom_headers ) elif method.upper() == "POST": resp = await client.post( url, json=params, headers=custom_headers ) else: return ( f"Unsupported method {method}", )
# Check for empty response if not resp.text: return resp.status_code, "{}" return resp.status_code, resp.text
finally: # Only close if we created a new client if should_close: await client.aclose()
封装一下, 后续可以加上熔断等功能
async def call_http( method: str, url: str, params: dict, verify: ssl.SSLContext | str | bool = True,) -> tuple[int, str]:
"""Make HTTP request with optional retry logic. Args: method: HTTP method (GET or POST) url: Target URL params: Request parameters verify: SSL verification settings Returns: Tuple of (status_code, response_text) """
return await execute_http_request(method, url, params, verify)
最后封装为 request_api
, 用了最常用的 TLS_1.2
from typing import Literalimport jsonfrom ssl import PROTOCOL_TLS_CLIENT, SSLContext, TLSVersionimport _ssl
def _prepare_request_params( request: dict,) -> tuple[dict, dict | None]: params = request.copy() headers = json.loads(params.pop("_headers", "{}")) return params, headers
async def request_api( url: str, request: dict, method: Literal["GET", "POST"] = "GET", domain: str | None = None,): # Prepare request verify = get_ssl_context(_ssl.PROTO_TLSv1_2)
params, headers = _prepare_request_params(request)
status, content = await call_http( method, url, params, verify=verify, headers=headers, )
return status, content
BRAF V600E
是一个常见癌症相关突变 (BRAF 基因第 600 位氨基酸 Val → Glu)- TCGA 数据库支持按氨基酸突变形式查询 (AA change format)
查询参数
cosmic_id
: COSMIC database IDgenomic_dna_change
: DNA-level mutationgene_aa_change
: protein amino acid changessm_id
: unique somatic mutation ID
url = 'https://api.gdc.cancer.gov/ssms'
params = { "filters": json.dumps({ "op": "in", "content": { "field": "gene_aa_change", "value": ["BRAF V600E"], }, }), "fields": "cosmic_id,genomic_dna_change,gene_aa_change,ssm_id", "format": "json", "size": "5", }
异步请求
response, content = await request_api( url=url, method="GET", request=params, domain="gdc", )
返回
'{"data": {"hits": [{"id": "84aef48f-31e6-52e4-8e05-7d5b9ab15087", "gene_aa_change": ["BRAF V157E", "BRAF V299E", "BRAF V600E", "BRAF V640E"], "cosmic_id": ["COSM476"], "ssm_id": "84aef48f-31e6-52e4-8e05-7d5b9ab15087", "genomic_dna_change": "chr7:g.140753336A>T"}], "pagination": {"count": 1, "total": 1, "size": 5, "from": 0, "sort": "", "page": 1, "pages": 1}}, "warnings": {}}'
接下来可以分词获得各个参数, 就不细说了
直接喂给 AI 也可以, 不过会消耗更多 token, AI 时代应该想办法压缩内容节约上下文信息
other options
Section titled “other options”- retry
- timeout
- breaker
- …