Skip to content

HTTP示例--TCGA

官方文档

GDC Data User’s Guide

GDC API User’s Guide

以 Simple Somatic Mutation (SSM) 为例, 其它类型可见这里

class TCGAVariantData(BaseModel):
cosmic_id: str | None = None
tumor_types: list[str] = Field(default_factory=list)
mutation_frequency: float | None = None
mutation_count: int | None = None
affected_cases: int | None = None
consequence_type: str | None = None
clinical_significance: str | None = None

示例

example = {
"cosmic_id": "COSM476",
"tumor_types": ["Skin Melanoma", "Colorectal Adenocarcinoma"],
"mutation_frequency": 0.45,
"mutation_count": 90,
"affected_cases": 85,
"consequence_type": "missense_variant",
"clinical_significance": "Pathogenic",
}

极其简化, 略过了 error 处理, cache 等重要细节

线程池

import httpx
import ssl
import asyncio
_pool_lock = asyncio.Lock()
async def get_connection_pool(
verify: ssl.SSLContext | str | bool,
timeout: httpx.Timeout,
) -> httpx.AsyncClient:
"""Get or create a shared connection pool for the given SSL context."""
global _connection_pools
# Create a key for the pool based on verify setting
if isinstance(verify, ssl.SSLContext):
pool_key = f"ssl_{id(verify)}"
else:
pool_key = str(verify)
async with _pool_lock:
pool = _connection_pools.get(pool_key)
if pool is None or pool.is_closed:
# Create a new connection pool with optimized settings
pool = httpx.AsyncClient(
verify=verify,
http2=False, # HTTP/2 can add overhead for simple requests
timeout=timeout,
limits=httpx.Limits(
max_keepalive_connections=20, # Reuse connections
max_connections=100, # Total connection limit
keepalive_expiry=30, # Keep connections alive for 30s
),
# Enable connection pooling
transport=httpx.AsyncHTTPTransport(
retries=0, # We handle retries at a higher level
),
)
_connection_pools[pool_key] = pool
return pool

证书配置

import certifi
def get_ssl_context(tls_version: TLSVersion) -> SSLContext:
    """Create an SSLContext with the specified TLS version."""
    context = SSLContext(PROTOCOL_TLS_CLIENT)
    context.minimum_version = tls_version
    context.maximum_version = tls_version
    context.load_verify_locations(cafile=certifi.where())
    return context

核心: 执行 http 请求

import httpx
async def execute_http_request(
    method: str,
    url: str,
    params: dict,
    verify: ssl.SSLContext | str | bool,
    headers: dict[str, str] | None = None,
) -> tuple[int, str]:
    """Execute the actual HTTP request using connection pooling.
    Args:
        method: HTTP method (GET or POST)
        url: Tsarget URL
        params: Request parameters
        verify: SSL verification settings
        headers: Optional custom headers
    Returns:
        Tuple of (status_code, response_text)
    Raises:
        ConnectionError: For connection failures
        TimeoutError: For timeout errors
    """
    HTTP_TIMEOUT_SECONDS = 120.0
    # Extract custom headers from params if present
    custom_headers = headers or {}
    # Use the configured timeout from constants
    timeout = httpx.Timeout(HTTP_TIMEOUT_SECONDS)
    # Use connection pooling with proper error handling
    use_pool = True
    if use_pool:
        try:
            client = await get_connection_pool(verify, timeout)
            should_close = False
        except Exception:
            # Fallback to creating a new client
            client = httpx.AsyncClient(
                verify=verify, http2=False, timeout=timeout
            )
            should_close = True
    else:
        # Create a new client for each request
        client = httpx.AsyncClient(
            verify=verify, http2=False, timeout=timeout
        )
        should_close = True
    try:
        # Make the request
        if method.upper() == "GET":
            resp = await client.get(
                url, params=params, headers=custom_headers
            )
        elif method.upper() == "POST":
            resp = await client.post(
                url, json=params, headers=custom_headers
            )
        else:
            return (
                f"Unsupported method {method}",
            )
        # Check for empty response
        if not resp.text:
            return resp.status_code, "{}"
        return resp.status_code, resp.text
    finally:
        # Only close if we created a new client
        if should_close:
            await client.aclose()

封装一下, 后续可以加上熔断等功能

async def call_http(
    method: str,
    url: str,
    params: dict,
    verify: ssl.SSLContext | str | bool = True,
) -> tuple[int, str]:
    """Make HTTP request with optional retry logic.
    Args:
        method: HTTP method (GET or POST)
        url: Target URL
        params: Request parameters
        verify: SSL verification settings
    Returns:
        Tuple of (status_code, response_text)
    """
    return await execute_http_request(method, url, params, verify)

最后封装为 request_api, 用了最常用的 TLS_1.2

from typing import Literal
import json
from ssl import PROTOCOL_TLS_CLIENT, SSLContext, TLSVersion
import _ssl
def _prepare_request_params(
    request: dict,
) -> tuple[dict, dict | None]:
    params = request.copy()
    headers = json.loads(params.pop("_headers", "{}"))
    return params, headers
async def request_api(
    url: str,
    request: dict,
    method: Literal["GET", "POST"] = "GET",
    domain: str | None = None,
):
    # Prepare request
    verify = get_ssl_context(_ssl.PROTO_TLSv1_2)
    params, headers = _prepare_request_params(request)
    status, content = await call_http(
        method,
        url,
        params,
        verify=verify,
        headers=headers,
    )
    return status, content
  • BRAF V600E 是一个常见癌症相关突变 (BRAF 基因第 600 位氨基酸 Val → Glu)
  • TCGA 数据库支持按氨基酸突变形式查询 (AA change format)

查询参数

  • cosmic_id : COSMIC database ID
  • genomic_dna_change : DNA-level mutation
  • gene_aa_change : protein amino acid change
  • ssm_id : unique somatic mutation ID
url = 'https://api.gdc.cancer.gov/ssms'
params = {
            "filters": json.dumps({
                "op": "in",
                "content": {
                    "field": "gene_aa_change",
                    "value": ["BRAF V600E"],
                },
            }),
            "fields": "cosmic_id,genomic_dna_change,gene_aa_change,ssm_id",
            "format": "json",
            "size": "5",
        }

异步请求

response, content = await request_api(
                url=url,
                method="GET",
                request=params,
                domain="gdc",
            )

返回

Terminal window
'{"data": {"hits": [{"id": "84aef48f-31e6-52e4-8e05-7d5b9ab15087", "gene_aa_change": ["BRAF V157E", "BRAF V299E", "BRAF V600E", "BRAF V640E"], "cosmic_id": ["COSM476"], "ssm_id": "84aef48f-31e6-52e4-8e05-7d5b9ab15087", "genomic_dna_change": "chr7:g.140753336A>T"}], "pagination": {"count": 1, "total": 1, "size": 5, "from": 0, "sort": "", "page": 1, "pages": 1}}, "warnings": {}}'

接下来可以分词获得各个参数, 就不细说了

直接喂给 AI 也可以, 不过会消耗更多 token, AI 时代应该想办法压缩内容节约上下文信息

  • retry
  • timeout
  • breaker