Source code for pyetcd.client

"""module to connect to an etcd node and perform low rest API requests."""
import requests
from requests import RequestException

from pyetcd import EtcdResult, EtcdException

SUPPORTED_PROTOCOLS = ['http']


[docs]class ClientException(Exception): """ Exception for errors in Client class """
[docs]class Client(object): """ Etcd Client class. :param kwargs: Keyword arguments: - **host** (str, list(str), list(tuple)) - etcd node hostname or list of hostnames or list of tuples (hostname, port). Default is '127.0.0.1'. - **port** (int) - TCP port to connect to. Default is 2379. - **srv_domain** (str) - Domain name if DNS discovery is used - **version_prefix** (str) - API version prefix. Default is 'v2'. - **allow_reconnect** (bool) - If client fails to connect to a cluster node connect to the next node in the cluster. Default is True. - **protocol** (str) - Protocol to connect to the cluster. Default is 'http'. :raise ClientException: if any errors :raise NotImplementedError: if there is an attempt to use unsupported DNS discovery. """ def __init__(self, **kwargs): if 'srv_domain' in kwargs: raise NotImplementedError('DNS discovery is not implemented') self._allow_reconnect = kwargs.get('allow_reconnect', True) protocol = kwargs.get('protocol', 'http') if protocol in SUPPORTED_PROTOCOLS: self._protocol = protocol else: raise ClientException('Protocol %s is unsupported' % protocol) self._version_prefix = kwargs.get('version_prefix', 'v2') self._srv_domain = None self._hosts = [] self._urls = [] host = kwargs.get('host', '127.0.0.1') port = kwargs.get('port', 2379) if isinstance(host, list): for host_item in host: if isinstance(host_item, tuple): self._hosts.append(host_item) url = "{protocol}://{host}:{port}" \ .format(protocol=self._protocol, host=host_item[0], port=host_item[1]) self._urls.append(url) else: self._hosts.append((host_item, port)) url = "{protocol}://{host}:{port}" \ .format(protocol=self._protocol, host=host_item, port=port) self._urls.append(url) else: self._hosts.append((host, port)) url = "{protocol}://{host}:{port}" \ .format(protocol=self._protocol, host=host, port=port) self._urls.append(url) self._session = requests.Session()
[docs] def write(self, key, value, ttl=None): """ Write value to a key :param key: Key :param value: Value :param ttl: Keys in etcd can be set to expire after a specified number of seconds. You can do this by setting a TTL (time to live) on the key. :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error """ data = { 'value': value } if ttl and ttl > 0: data['ttl'] = int(ttl) return self._request_key(key, method='put', data=data)
[docs] def read(self, key, **kwargs): """ Read key value :param key: Key :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error """ return self._request_key(key, params=kwargs)
[docs] def delete(self, key): """ Delete a key :param key: Key :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error """ return self._request_key(key, method='delete')
[docs] def version(self): """ Return Etcd server version :return: string with Etcd server version. E.g. '2.3.7' :rtype: str """ response = self._request_call('/version') return response.version_etcdserver
[docs] def version_server(self): """ Same as .version() :return: string with Etcd server version. E.g. '2.3.7' :rtype: str """ return self.version()
[docs] def version_cluster(self): """ Return Etcd cluster version :return: string with Etcd cluster version. E.g. '2.3.0' :rtype: str """ response = self._request_call('/version') return response.version_etcdcluster
@property def health(self): """ :return: True if the node is healthy :rtype: bool """ return self._request_call('/health').health
[docs] def mkdir(self, directory): """ Create directory :param directory: string with directory name :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error """ data = { 'dir': True, 'prevExist': False } return self._request_key(directory, method='put', data=data)
[docs] def rmdir(self, directory, recursive=False): """ Delete directory :param directory: string with directory name :param recursive: recursively delete directory if not empty :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error """ params = { 'dir': 'true' } if recursive: params['recursive'] = 'true' return self._request_key(directory, params=params, method='delete')
[docs] def compare_and_swap( # pylint: disable=too-many-arguments self, key, value, prev_value=None, prev_index=None, prev_exist=None, ttl=None): """ This command will set the value of a key only if the client-provided conditions are equal to the current conditions. :param key: key string :param value: key value :param prev_value: checks the previous value of the key. :param prev_index: checks the previous modifiedIndex of the key. :param prev_exist: checks existence of the key: if prevExist is True, it is an update request; if prevExist is False, it is a create request. :param ttl: set ttl on the key in seconds :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error. :raise EtcdNodeExist: if condition ``prev_exist=False`` fails. :raise EtcdTestFailed: if condition ``prev_value='bar'`` fails. """ data = { 'value': value } if ttl: data['ttl'] = ttl params = None if prev_exist is not None: params = { 'prevExist': prev_exist } if prev_value is not None: params = { 'prevValue': prev_value } if prev_index is not None: params = { 'prevIndex': prev_index } return self._request_key(key, method='put', params=params, data=data)
[docs] def compare_and_delete(self, key, prev_value=None, prev_index=None): """ This command will delete a key only if the client-provided conditions are equal to the current conditions. :param key: the key :param prev_value: checks the previous value of the key. :param prev_index: checks the previous modifiedIndex of the key. :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error. :raise EtcdNodeExist: if any condition fails. """ params = None if prev_value is not None: params = { 'prevValue': prev_value } if prev_index is not None: params = { 'prevIndex': prev_index } return self._request_key(key, method='delete', params=params)
[docs] def update_ttl(self, key, ttl): """ Update key's ttl :param key: the key :param ttl: new ttl :return: Result of operation. :rtype: EtcdResult :raise EtcdException: if etcd responds with error or HTTP error. :raise EtcdKeyNotFound: if the key doesn't exist """ data = { 'ttl': ttl, 'refresh': 'true', 'prevExist': 'true' } return self._request_key(key, method='put', data=data)
[docs] def add_member(self, peer_urls): """ Add a node to the cluster. :param peer_urls: List of URL for inter-peer communication. For example:: ["http://10.0.0.10:2380"] :return: Information about the newly added node. :rtype: EtcdResult """ return self._request_call( '/v2/members', method='post', json={ 'peerURLs': peer_urls } )
[docs] def remove_member(self, member_id): """ Remove a node from the cluster. :param member_id: etcd identifier of the node. For example, ``272e204152``. """ self._request_call( '/v2/members/%s' % member_id, method='delete' )
def _request_key(self, key, method='get', params=None, **kwargs): """ Make an API call on a key :param key: key string. must start with '/' :param method: HTTP method in lower case (put, get, post, etc) :param params: dictionary with parameters that will be added to URI :type params: dict :param kwargs: keyword arguments to be passed down to _request_call() :return: Result of operation. :rtype: EtcdResult """ uri = "/{version_prefix}/keys{key}".format( version_prefix=self._version_prefix, key=key ) if params: uri += "?" sep = "" for param, value in sorted(params.items()): if isinstance(value, bool): value = str(value).lower() uri += "%s%s=%s" % (sep, param, value) sep = "&" return self._request_call(uri, method=method, **kwargs) def _request_call(self, uri, method='get', **kwargs): if self._allow_reconnect: urls = self._urls else: urls = [self._urls[0]] error_messages = [] for endpoint in urls: try: url = endpoint + uri return EtcdResult( getattr(self._session, method)( url, **kwargs ) ) except RequestException as err: error_messages.append("%s: %s" % (endpoint, err)) raise EtcdException( 'No more hosts to connect.\nErrors: %s' % '\n'.join(error_messages) )