<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;"># Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file

import copy
import json
from base64 import urlsafe_b64decode, urlsafe_b64encode
from collections import namedtuple
from collections.abc import MutableMapping

# Padding stripping versions as described in
# RFC 7515 Appendix C


def base64url_encode(payload):
    if not isinstance(payload, bytes):
        payload = payload.encode('utf-8')
    encode = urlsafe_b64encode(payload)
    return encode.decode('utf-8').rstrip('=')


def base64url_decode(payload):
    size = len(payload) % 4
    if size == 2:
        payload += '=='
    elif size == 3:
        payload += '='
    elif size != 0:
        raise ValueError('Invalid base64 string')
    return urlsafe_b64decode(payload.encode('utf-8'))


# JSON encoding/decoding helpers with good defaults

def json_encode(string):
    if isinstance(string, bytes):
        string = string.decode('utf-8')
    return json.dumps(string, separators=(',', ':'), sort_keys=True)


def json_decode(string):
    if isinstance(string, bytes):
        string = string.decode('utf-8')
    return json.loads(string)


class JWException(Exception):
    pass


class InvalidJWAAlgorithm(JWException):
    def __init__(self, message=None):
        msg = 'Invalid JWA Algorithm name'
        if message:
            msg += ' (%s)' % message
        super(InvalidJWAAlgorithm, self).__init__(msg)


class InvalidCEKeyLength(JWException):
    """Invalid CEK Key Length.

    This exception is raised when a Content Encryption Key does not match
    the required length.
    """

    def __init__(self, expected, obtained):
        msg = 'Expected key of length %d bits, got %d' % (expected, obtained)
        super(InvalidCEKeyLength, self).__init__(msg)


class InvalidJWEOperation(JWException):
    """Invalid JWS Object.

    This exception is raised when a requested operation cannot
    be execute due to unsatisfied conditions.
    """

    def __init__(self, message=None, exception=None):
        msg = None
        if message:
            msg = message
        else:
            msg = 'Unknown Operation Failure'
        if exception:
            msg += ' {%s}' % repr(exception)
        super(InvalidJWEOperation, self).__init__(msg)


class InvalidJWEKeyType(JWException):
    """Invalid JWE Key Type.

    This exception is raised when the provided JWK Key does not match
    the type required by the specified algorithm.
    """

    def __init__(self, expected, obtained):
        msg = 'Expected key type %s, got %s' % (expected, obtained)
        super(InvalidJWEKeyType, self).__init__(msg)


class InvalidJWEKeyLength(JWException):
    """Invalid JWE Key Length.

    This exception is raised when the provided JWK Key does not match
    the length required by the specified algorithm.
    """

    def __init__(self, expected, obtained):
        msg = 'Expected key of length %d, got %d' % (expected, obtained)
        super(InvalidJWEKeyLength, self).__init__(msg)


class InvalidJWSERegOperation(JWException):
    """Invalid JWSE Header Registry Operation.

    This exception is raised when there is an error in trying to add a JW
    Signature or Encryption header to the Registry.
    """

    def __init__(self, message=None, exception=None):
        msg = None
        if message:
            msg = message
        else:
            msg = 'Unknown Operation Failure'
        if exception:
            msg += ' {%s}' % repr(exception)
        super(InvalidJWSERegOperation, self).__init__(msg)


class JWKeyNotFound(JWException):
    """The key needed to complete the operation was not found.

    This exception is raised when a JWKSet is used to perform
    some operation and the key required to successfully complete
    the operation is not found.
    """

    def __init__(self, message=None):
        if message:
            msg = message
        else:
            msg = 'Key Not Found'
        super(JWKeyNotFound, self).__init__(msg)


# JWSE Header Registry definitions

# RFC 7515 - 9.1: JSON Web Signature and Encryption Header Parameters Registry
# HeaderParameters are for both JWS and JWE
JWSEHeaderParameter = namedtuple('Parameter',
                                 'description mustprotect supported check_fn')


class JWSEHeaderRegistry(MutableMapping):
    def __init__(self, init_registry=None):
        if init_registry:
            if isinstance(init_registry, dict):
                self._registry = copy.deepcopy(init_registry)
            else:
                raise InvalidJWSERegOperation('Unknown input type')
        else:
            self._registry = {}

        MutableMapping.__init__(self)

    def check_header(self, h, value):
        if h not in self._registry:
            raise InvalidJWSERegOperation('No header "%s" found in registry'
                                          % h)

        param = self._registry[h]
        if param.check_fn is None:
            return True
        else:
            return param.check_fn(value)

    def __getitem__(self, key):
        return self._registry.__getitem__(key)

    def __iter__(self):
        return self._registry.__iter__()

    def __delitem__(self, key):
        if self._registry[key].mustprotect or \
           self._registry[key].supported:
            raise InvalidJWSERegOperation('Unable to delete protected or '
                                          'supported field')
        else:
            self._registry.__delitem__(key)

    def __setitem__(self, h, jwse_header_param):
        # Check if a header is not supported
        if h in self._registry:
            p = self._registry[h]
            if p.supported:
                raise InvalidJWSERegOperation('Supported header already exists'
                                              ' in registry')
            elif p.mustprotect and not jwse_header_param.mustprotect:
                raise InvalidJWSERegOperation('Header specified should be'
                                              'a protected header')
            else:
                del self._registry[h]

        self._registry[h] = jwse_header_param

    def __len__(self):
        return self._registry.__len__()
</pre></body></html>