Source code for apigraph.graph

from typing import Dict, FrozenSet, Optional, Set, Tuple, Union
from urllib.parse import unquote, urlsplit, urlunsplit

import inject
import networkx as nx
from jsonspec.pointer import Pointer
from openapi_orm.models import (
    Link,
    OpenAPI3Document,
    Operation,
    Parameter,
    PathItem,
    SecurityScheme,
)

from apigraph.loader import load_doc
from apigraph.types import (
    EdgeKey,
    HttpMethod,
    LinkDetail,
    LinkType,
    NodeKey,
    OperationDetail,
    OperationIdPathIndex,
    ParamKey,
)


[docs]class InvalidDocumentError(Exception): pass
[docs]class InvalidLinkError(InvalidDocumentError): pass
[docs]class InvalidBacklinkError(InvalidDocumentError): pass
[docs]class DuplicateOperationId(InvalidDocumentError): pass
[docs]class CircularDependencyError(InvalidDocumentError): pass
[docs]class InvalidSecuritySchemeError(InvalidDocumentError): pass
def _build_operation_id_path_index(doc: OpenAPI3Document) -> OperationIdPathIndex: """ OpenAPI spec allows to refer to an Operation by its name, using the `operationId` attribute (in links etc). To ease fetching an operation by its name we build an index of id -> (path, method) It's then trivial to fetch an Operation from doc by (path, method). Raises: DuplicateOperationId """ index: OperationIdPathIndex = {} for path, path_item in doc.paths.items(): for method in HttpMethod: operation = getattr(path_item, method.value) operation_id = getattr(operation, "operationId", None) if operation_id is not None: if operation_id in index: raise DuplicateOperationId(operation_id) index[operation_id] = (path, method) return index
[docs]class APIGraph: # We are using a multi-graph because it's possible to have multiple # links or backlinks between same endpoints i.e. multiple edges # having the same direction but with different chainIds. # In cases where they share a chainId then apigraph will consolidate # the redundant edges into one by preferring backlinks over links, and # arbitrarily in case of link+link or backlink+backlink redundancy. graph: nx.MultiDiGraph docs: Dict[str, OpenAPI3Document] # {<doc_uri>: <doc>} _indexes: Dict[str, OperationIdPathIndex] # {<doc_uri>: <index>} _chains: Dict[FrozenSet[str], nx.DiGraph] # {<matched chainIds>: <sub-graph>} def __init__(self, start_uri: str): self.graph = nx.MultiDiGraph() self.docs = {} self._indexes = {} self._chains = {} self._build(start_uri) self.graph = nx.freeze(self.graph)
[docs] def get_operation(self, node_key: NodeKey) -> Operation: """ Get operation element specified by `node_key` from relevant api doc. """ doc = self.docs[node_key.doc_uri] path = doc.paths[node_key.path] return getattr(path, node_key.method)
[docs] def chain_for_node( self, node_key: NodeKey, chain_id: str, traverse_anonymous: bool = True ) -> nx.MultiDiGraph: """ Get a subgraph view containing ancestors of `node_key` which are related via edges having this `chain_id`. NOTE: Includes the node identified by `node_key` itself. If `traverse_anonymous=True` then will return ancestors with no chain_id in additional to the requested chain_id (this is because chainId is an extension to OpenAPI and you may reach documents which do not use it, also it allows to avoid creating redundant links for multiple chains, null chain can be used as a default link). Raises: CircularDependencyError """ if traverse_anonymous: chain_key = frozenset([chain_id, None]) else: chain_key = frozenset([chain_id]) if chain_id not in self._chains: # materialize a view chain_view = nx.subgraph_view( self.graph, filter_edge=lambda _u, _v, key: key.chain_id in chain_key, ) # check for cycles... # if not nx.is_directed_acyclic_graph(chain_view): # raise CircularDependencyError( # node_key, # chain_id, # nx.simple_cycles(chain_view), # (generator) # ) # memoize self._chains[chain_key] = nx.freeze(chain_view) chain = self._chains[chain_key] # filter chain for ancestors of node_key dependencies = nx.ancestors(chain, node_key) | {node_key} return chain.subgraph(dependencies)
def _get_operation_id_path_index( self, doc_uri: str, doc: OpenAPI3Document ) -> OperationIdPathIndex: if doc_uri not in self._indexes: self._indexes[doc_uri] = _build_operation_id_path_index(doc) return self._indexes[doc_uri] @inject.params(_dc_settings="settings") def _build(self, start_uri: str, _dc_settings=None): doc = load_doc(start_uri) doc_index = self._get_operation_id_path_index(start_uri, doc) uris_to_crawl = set() def _pointer_from_ref(ref: str) -> Tuple[Pointer, str]: url = urlsplit(ref) if url.scheme: doc_uri = urlunsplit(url[:-1] + ("",)) # add remote doc into queue uris_to_crawl.add(doc_uri) else: # relative ref doc_uri = start_uri return Pointer(url.fragment), doc_uri def _decode_operation_ref(operation_ref: str) -> Tuple[str, str, str]: # we can assume that operationRef is like: `/paths/{path}/{method}` (_, path, method), doc_uri = _pointer_from_ref(operation_ref) path = unquote(path) return doc_uri, path, method def _decode_response_ref(response_ref: str) -> Tuple[str, str, str, str]: # we can assume that responseRef is like: `/paths/{path}/{method}/responses/{response_id}` (_, path, method, _, response_id), doc_uri = _pointer_from_ref(response_ref) path = unquote(path) return doc_uri, path, method, response_id def edge_args_for_backlink( backlink: Dict[str, Dict] ) -> Tuple[NodeKey, Optional[str], str]: """ Raises: InvalidBacklinkError """ response_ref = backlink.responseRef operation_id = backlink.operationId operation_ref = backlink.operationRef response_id = backlink.response chain_id = backlink.chainId if response_ref is not None: doc_uri, path, method, response_id = _decode_response_ref(response_ref) elif operation_id is not None and response_id is not None: path, method = doc_index[operation_id] doc_uri = start_uri elif operation_ref is not None and response_id is not None: doc_uri, path, method = _decode_operation_ref(operation_ref) else: # (should not be reachable due to pydantic model validation) raise InvalidBacklinkError(backlink) return NodeKey(doc_uri, path, method), chain_id, response_id def add_backlinks(to_node: NodeKey, backlinks): # NOTE: to/from nodes which are not in graph will be added with no attrs # (such nodes will have attrs filled when we get round to crawling their doc) for name, backlink in backlinks.items(): from_node, chain_id, response_id = edge_args_for_backlink(backlink) key = EdgeKey(chain_id, response_id) self.graph.add_edge( from_node, to_node, key=key, response_id=response_id, chain_id=chain_id, detail=LinkDetail( link_type=LinkType.BACKLINK, name=name, description=backlink.description, parameters=backlink.parameters, requestBody=backlink.requestBody, requestBodyParameters=backlink.requestBodyParameters, ), ) def edge_args_for_link(link: Link) -> Tuple[NodeKey, str, Link]: """ Raises: InvalidLinkError """ operation_id = link.operationId operation_ref = link.operationRef chain_id = link.chainId if operation_id is not None: path, method = doc_index[operation_id] doc_uri = start_uri elif operation_ref is not None: doc_uri, path, method = _decode_operation_ref(operation_ref) else: # (should not be reachable due to pydantic model validation) raise InvalidLinkError(link) return NodeKey(doc_uri, path, method), chain_id def add_links(from_node: NodeKey, response_id: str, links): # NOTE: to/from nodes which are not in graph will be added with no attrs # (such nodes will have attrs filled when we get round to crawling their doc) for name, link in links.items(): to_node, chain_id = edge_args_for_link(link) key = EdgeKey(chain_id, response_id) # in case of redundant edges, backlinks win # (and otherwise last-write wins) if ( from_node in self.graph and to_node in self.graph[from_node] and key in self.graph[from_node][to_node] and ( self.graph[from_node][to_node][key]["detail"].link_type is LinkType.BACKLINK ) ): continue self.graph.add_edge( from_node, to_node, key=key, response_id=response_id, chain_id=chain_id, detail=LinkDetail( link_type=LinkType.LINK, name=name, description=link.description, parameters=link.parameters, requestBody=link.requestBody, requestBodyParameters=link.requestBodyParameters, ), ) def get_parameters( source: Union[PathItem, Operation] ) -> Dict[ParamKey, Parameter]: """ NOTE: we expect duplicate keys to have been rejected by model validation """ return { ParamKey(name=param.name, location=param.in_): param for param in source.parameters } def get_security_schemes_for_operation( operation: Operation, ) -> Set[FrozenSet[SecurityScheme]]: """ outer set are the alternative security options for the operation inner set are security schemes required together by this option Raises: InvalidSecuritySchemeError """ # eliminate empty requirements dicts # (they are not prohibited by OpenAPI spec but are not meaningful) security_requirements = filter( lambda req: bool(req), ( operation.security if operation.security is not None else doc.security ), ) if doc.components: scheme_defs = doc.components.securitySchemes else: scheme_defs = {} try: return set( frozenset(scheme_defs[name] for name in requirement.keys()) for requirement in security_requirements ) except KeyError as e: raise InvalidSecuritySchemeError( e.args[0], operation, # scheme name ) from e for path, path_item in doc.paths.items(): for method in HttpMethod: operation = getattr(path_item, method.value) if operation is not None: node_key = NodeKey(doc_uri=start_uri, path=path, method=method) parameters = get_parameters(path_item) parameters.update(get_parameters(operation)) self.graph.add_node( node_key, detail=OperationDetail( path=path, method=method, summary=operation.summary, description=operation.description, parameters=parameters, requestBody=operation.requestBody, security_schemes=get_security_schemes_for_operation( operation ), ), ) for response_id, response in operation.responses.items(): add_links(node_key, response_id, response.links) add_backlinks(node_key, operation.backlinks) self.docs[start_uri] = doc # remove any docs we already crawled uris_to_crawl -= self.docs.keys() for uri in uris_to_crawl: self._build(uri)