Port check-openapi-source to OpenAPI 3.1.0

Signed-off-by: Kévin Commaille <zecakeh@tedomum.fr>
This commit is contained in:
Kévin Commaille 2023-09-28 13:15:52 +02:00
parent d5b73f4d56
commit 153c0edf21
No known key found for this signature in database
GPG key ID: 29A48C1F03620416

View file

@ -1,5 +1,10 @@
#! /usr/bin/env python #! /usr/bin/env python
#
# Validates the OpenAPI definitions under `../data/api`. Checks the request
# parameters and body, and response body. The schemas are validated against the
# JSON Schema 2020-12 specification and the examples are validated against those
# schemas.
# Copyright 2016 OpenMarket Ltd # Copyright 2016 OpenMarket Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
@ -47,17 +52,39 @@ def check_schema(filepath, example, schema):
example = resolve_references(filepath, example) example = resolve_references(filepath, example)
schema = resolve_references(filepath, schema) schema = resolve_references(filepath, schema)
resolver = jsonschema.RefResolver(filepath, schema, handlers={"file": load_file}) resolver = jsonschema.RefResolver(filepath, schema, handlers={"file": load_file})
jsonschema.validate(example, schema, resolver=resolver) validator = jsonschema.Draft202012Validator(schema, resolver)
validator.validate(example)
def check_parameter(filepath, request, parameter): def check_parameter(filepath, request, parameter):
schema = parameter.get("schema") schema = parameter.get('schema')
example = schema.get('example') example = parameter.get('example')
if not example:
example = schema.get('example')
if example and schema: if example and schema:
try: try:
print("Checking request schema for: %r %r" % ( print("Checking schema for request parameter: %r %r %r" % (
filepath, request filepath, request, parameter.get("name")
))
check_schema(filepath, example, schema)
except Exception as e:
raise ValueError("Error validating JSON schema for %r" % (
request
), e)
def check_request_body(filepath, request, body):
schema = body.get('schema')
example = body.get('example')
if not example:
example = schema.get('example')
if example and schema:
try:
print("Checking schema for request body: %r %r" % (
filepath, request,
)) ))
check_schema(filepath, example, schema) check_schema(filepath, example, schema)
except Exception as e: except Exception as e:
@ -67,44 +94,59 @@ def check_parameter(filepath, request, parameter):
def check_response(filepath, request, code, response): def check_response(filepath, request, code, response):
example = response.get('examples', {}).get('application/json')
schema = response.get('schema') schema = response.get('schema')
if example and schema: if schema:
try: for name, example in response.get('examples', {}).items():
print ("Checking response schema for: %r %r %r" % ( value = example.get('value')
filepath, request, code if value:
)) try:
check_schema(filepath, example, schema) print ("Checking response schema for: %r %r %r %r" % (
except jsonschema.SchemaError as error: filepath, request, code, name
for suberror in sorted(error.context, key=lambda e: e.schema_path): ))
print(list(suberror.schema_path), suberror.message, sep=", ") check_schema(filepath, value, schema)
raise ValueError("Error validating JSON schema for %r %r" % ( except jsonschema.SchemaError as error:
request, code for suberror in sorted(error.context, key=lambda e: e.schema_path):
), e) print(list(suberror.schema_path), suberror.message, sep=", ")
except Exception as e: raise ValueError("Error validating JSON schema for %r %r" % (
raise ValueError("Error validating JSON schema for %r %r" % ( request, code
request, code ), e)
), e) except Exception as e:
raise ValueError("Error validating JSON schema for %r %r" % (
request, code
), e)
def check_openapi_file(filepath): def check_openapi_file(filepath):
with open(filepath) as f: with open(filepath) as f:
openapi = yaml.safe_load(f) openapi = yaml.safe_load(f)
openapi_version = openapi.get('openapi')
if not openapi_version:
# This is not an OpenAPI file, skip.
return
elif openapi_version != '3.1.0':
raise ValueError("File %r is not using the proper OpenAPI version: expected '3.1.0', got %r" % (filepath, openapi_version))
for path, path_api in openapi.get('paths', {}).items(): for path, path_api in openapi.get('paths', {}).items():
for method, request_api in path_api.items(): for method, request_api in path_api.items():
request = "%s %s" % (method.upper(), path) request = "%s %s" % (method.upper(), path)
for parameter in request_api.get('parameters', ()): for parameter in request_api.get('parameters', ()):
if parameter['in'] == 'body': check_parameter(filepath, request, parameter)
check_parameter(filepath, request, parameter)
json_body = request_api.get('requestBody', {}).get('content', {}).get('application/json')
if json_body:
check_request_body(filepath, request, json_body)
try: try:
responses = request_api['responses'] responses = request_api['responses']
except KeyError: except KeyError:
raise ValueError("No responses for %r" % (request,)) raise ValueError("No responses for %r" % (request,))
for code, response in responses.items(): for code, response in responses.items():
check_response(filepath, request, code, response) json_response = response.get('content', {}).get('application/json')
if json_response:
check_response(filepath, request, code, json_response)
def resolve_references(path, schema): def resolve_references(path, schema):
@ -171,7 +213,7 @@ if __name__ == '__main__':
# Resolve the directory containing the OpenAPI sources, # Resolve the directory containing the OpenAPI sources,
# relative to the script path # relative to the script path
source_files_directory = os.path.realpath(os.path.join(script_directory, "../data")) source_files_directory = os.path.realpath(os.path.join(script_directory, "../data/api"))
# Walk the source path directory, looking for YAML files to check # Walk the source path directory, looking for YAML files to check
for (root, dirs, files) in os.walk(source_files_directory): for (root, dirs, files) in os.walk(source_files_directory):