Понимание обнаружения служб в распределенных системах

Понимание обнаружения служб в распределенных системах

22 июня 2023 г.

Поработать пару лет в области науки о данных и одновременно получить степень в области разработки программного обеспечения — это все равно, что разделить себя на две половины. Половина требует изучения сложных математических принципов, всех типов архитектур нейронных сетей, хранения данных и бизнес-анализа, в то время как вторая заставляет вас узнать, как информация кодируется и используется компьютерами, как работают алгоритмы и как создавать всевозможные Приложения. Эти две области дополняют друг друга, вторая помогает первой стать лучшим специалистом по данным и лучше понять мир, в котором живут мои модели.

На последнем курсе бакалавриата у меня был курс, который мне очень понравился — «Распределенные системы». Мне это так понравилось, что я решил продолжить учебную программу и начал учиться по одной из лучших книг, которые я читал — «Designing Data-Intensive Applications» Мартина Клеппманна, а затем, объединив свои бакалаврская работа «Распределенные системы и обработка естественного языка для создания чат-бота».

Теперь, разработав пару распределенных систем и получив больше опыта в них, я решил начать писать пару статей, объясняющих и реализующих сервисы, шаблоны и принципы из этой области, начиная с одного из самых важных сервисов во всех распределенных системах.

Постановка проблемы.

В распределенных службах все приложение представлено службами — веб-серверами, работающими на разных машинах и отвечающими за разные функции приложения. Возьмем, к примеру, функциональные возможности Spotify. Spotify не только позволяет пользователям воспроизводить музыку на разных устройствах, но также позволяет исполнителям загружать свои песни, создавать музыкальные рекомендации, предоставлять тексты песен, следить за исполнителями, управлять финансами и т. д. Все эти функциональные возможности реализуются различными службами или репликами служб, имеющими сотни или тысячи служб, которые должны взаимодействовать друг с другом. Однако в такой архитектуре машины, на которых работают службы, приходят и уходят, поэтому их расположение всегда меняется. Требуется механизм для хранения, обновления и удаления информации о Сервисах, и обычно для этой цели используется Service Discovery.

Figure 1: A distributed system without a Service Discovery.

Что такое сервис Discovery?

Обнаружение служб обычно представляет собой службу, разработанную как реестр служб, в которой хранится вся информация о службах и как точка объявления о том, что службы все еще работают. Обычно это одна или несколько веб-служб, предоставляющих API для регистрации, обновления и удаления служебной информации, а также некоторые конечные точки для отправки запросов пульса. API предоставляет в основном функции CRUD. Если вы не знакомы с операциями CRUD, CRUD означает:

* Create — Создание сущностей в хранилище данных. * Чтение — Чтение значений сущностей из хранилища данных. * Обновление — Обновление значений сущностей в хранилище данных. * Удалить — Удаление объектов из хранилища данных.

Когда служба в распределенных системах запускается, она отправляет запрос на создание службе обнаружения для регистрации, чтобы служба обнаружения знала о ее существовании. Затем служба делает второй запрос на чтение, чтобы получить информацию о службах, с которыми она должна связаться или отправить запрос. Добавлена ​​операция обновления для повторного развертывания службы в другом месте или для перенаправления нагрузки на другую службу. Наконец, создается операция Удалить, чтобы просто удалить службу в случае падения сервера или вручную.

Figure 2: Registering to Service Discovery.

Теперь давайте больше узнаем о запросах пульса.

Запросы пульса.

В вашей голове мог возникнуть естественный вопрос. Как Service Discovery узнает, что служба все еще работает после ее регистрации? А вот и запросы Heartbeat. После регистрации и получения информации о запрошенных сервисах сервис начинает раз в t секунд или минут отправлять запрос в Service Discovery, сообщая, что сервис все еще жив. Если служба не отправляет запросы пульса для t x 2 или 3, то в зависимости от реализации служба обнаружения удаляет или инициирует проверку работоспособности службы. Обычно в запросах пульса нет полезной нагрузки, но опять же, в зависимости от реализации, они могут содержать некоторые данные.

Реализация Python.

Далее я собираюсь показать, как простое обнаружение служб можно реализовать на Python в виде веб-приложения Flask. Следуя CRUD API, каждая конечная точка должна принимать определенные методы HTTP:

* Создать ПОСТ; * Читать: ПОЛУЧИТЬ; * Обновление: ПОСТАВИТЬ; * Удалить: УДАЛИТЬ

Обычно в зависимости от объема хранимых данных конечная точка чтения может быть разделена на две: read_all и read_some или read_one. Это зависит от разработчика и архитектуры. В этом примере мы собираемся реализовать две конечные точки чтения:

* читать все; * прочитать немного;

Также вместо подключения базы данных мы собираемся реализовать класс ServiceRegistery, в котором будет храниться информация о службах. Служебная информация будет представлять собой простой словарь со следующими полями:

* name — название сервиса. * host — хост сервиса. * порт — сервисный порт.

Информация об услуге будет выглядеть следующим образом.

{
    "name" : "playlist-service-1",
    "host" : "234:48:194:123",
    "port" : 6000
}

Листинг 1. Схема учетных данных службы.

Для сервисных операций CRUD ServiceRegistery имеет одно поле и пять функций (помните, что мы реализуем 2 формы запросов на чтение). Поле — это простой словарь, который мы используем для хранения информации о службах. Он настраивается в конструкторе следующим образом:

class ServiceRegistery:
    def __init__(self):
        self.services = {}
        self.heartbeats = {}
        self.heartbeats_lock = threading.Lock()
        self.time_threashold = 30

heartbeats, heartbeats_lock и time_threashold используются для управления запросами пульса и будут рассмотрены позже.

Операция создания принимает тело запроса, и если служба, описанная в теле запроса, не существует, она добавляется, в противном случае возвращается сообщение об ошибке следующего вида:

def create(self, request_body : dict):
    '''
        This function adds a service in service registries.
    :param request_body: dict
        The request body.
    :returns: dict, int
        The response returned.
        The status code.
    '''
    # Getting the name of the service.
    name = request_body["name"]

    # Checking the presence of the service.
    if name in self.services:
        return {
            "message" : "Service already registered!"
        }, 208
    else:
        # Adding the service to the service and heartbeat registries.
        self.services[name] = request_body
        self.heartbeats_lock.acquire()
        self.heartbeats[name] = time.time()
        self.heartbeats_lock.release()
        return request_body, 200

Как видно из приведенного выше листинга, в случае ошибки возвращается статус запроса 208. Это состояние называется Already Reported. Обычно используется, когда ресурс запрашивается несколько раз.

Операция чтения всех не требует каких-либо параметров, поэтому она просто возвращает словарь с информацией о службах и кодом состояния 200. В случае чтения некоторых функций она принимает список запрошенных служб, если все службы доступны. представлены, то они возвращаются в виде словаря, иначе список отсутствующих имен служб и код ошибки 404.

def read_all(self):
    '''
        This function returns the dictionary of the all registered services.
    :return: dict, int
        The service registry.
        The status code.
    '''
    return self.services, 200

def read_some(self, services_list : list):
    '''
        This function returns the dictionary with the information of the requested services.
    :param services_list: list
        The list containing the names of services requested.
    :return: dict, int
        The dictionary with the service information or the error message.
        The status code.
    '''
    # Getting the missing services in the service registry from the requested ones.
    services_dif = set(services_list).difference(self.services.keys())

    # If there are missing services then the list of missing services is returned.
    if len(services_dif) > 0:
        return {
            "missing_services" : list(services_dif)
        }, 404
    else:
        # If all services are present the information of those services is returned.
        return {
            service_name : self.services[service_name]
            for service_name in services_list
        }, 200

При обновлении информации о службах функция обновления считывает тело запроса и обновляет с его помощью информацию о службе. В случае существования этой службы возвращается обновленная информация о службе и код состояния 200, в противном случае возвращается сообщение об ошибке с состоянием ошибки 404.

def update(self, request_body : dict):
    '''
        This function updates the information about a service.
    :param request_body: dict
        The body of the request.
    :return: dict, int
        The new values of the service.
        The status code.
    '''
    # Getting the name of the service.
    name = request_body["name"]

    # If the service is present then it's information is updated.
    if name in self.services:
        self.services[name].update(request_body)
        return self.services[name], 200
    else:
        # If the service is missing the error message is returned.
        return {
            "message" : "No such service!"
        }, 404

Окончательное удаление службы выполняется следующей функцией ServiceRegistey:

def delete(self, request_body : dict):
    '''
        This function deletes a service from service registry.
    :param request_body: dict
        The body of the request.
    :return: dict, int
        The new values of the service.
        The status code.
    '''
    # Getting the name of the service from request body.
    name = request_body["name"]
    if name in self.services:
        # Getting the service information and deleting it from service registry.
        service_info = self.services[name]
        del self.services[name]


        # Deleting the service from heartbeats.
        self.heartbeats_lock.acquire()
        del self.heartbeats[name]
        self.heartbeats_lock.release()
        return service_info, 200
    else:
        # Returning the error message if the requested service isn't present in registry.
        return {
            "message" : "No such service!"
        }, 404

Все эти функции вызываются в конечных точках Service Discovery, перечисленных ниже:

# Importing all needed modules.
from flask import Flask, request
from service_registry import ServiceRegistry
import threading

# Creating the Flask application.
app = Flask(__name__)

# Creation of the Service registry.
service_registry = ServiceRegistry()

@app.route("/service", methods=["POST"])
def create():
    '''
        This function processes the registration requests.
    '''
    request_body = request.json
    response, status_code = service_registry.create(request_body)
    return response, status_code

@app.route("/service", methods=["GET"])
def read():
    '''
        This function processes the requests of getting information about services.
    '''
    request_body = request.json
    if request_body is not None:
        response, status_code = service_registry.read_some(request_body["services"])
    else:
        response, status_code = service_registry.read_all()
    return response, status_code

@app.route("/service", methods=["PUT"])
def update():
    '''
        This function processes the update requests.
    '''
    request_body = request.json
    response, status_code = service_registry.update(request_body)
    return response, status_code

@app.route("/service", methods=["DELETE"])
def delete():
    '''
        This function processes the delete requests.
    '''
    request_body = request.json
    response, status_code = service_registry.delete(request_body)
    return response, status_code

# Running the main service.
app.run()

Реализация конечной точки Heartbeat.

Чтобы реализовать способность Service Discovery находить службы, которые больше не работают, как было сказано ранее, после регистрации и получения информации о других службах служба начинает отправлять запросы пульса в Service Discovery. После каждого полученного запроса Service Discovery обновляет последнюю временную метку пульса службы. Кроме того, служба обнаружения работает в отдельном потоке, в котором в момент времени t проверяется время, прошедшее с момента последнего запроса подтверждения. Если прошедшее время меньше t, то сервис объявляется мертвым, и в зависимости от реализации предпринимаются различные действия.

Во-первых, ниже показана функция добавления пульса в ServiceRegistery:

def add_heartbeat(self, service_name : str):
    '''
        This function updates the heartbeat timestamp for a service
    :param service_name: str
        The name of service sending the heartbeat request.
    :return: dict, int
        The new values of the service.
        The status code.
    '''
    # Checking if the service is registered.
    if service_name not in self.heartbeats:
        # Returning the error message and 404 status code.
        return {
            "message" : "Not registered service!"
        }, 404
    else:
        # Updating the last heartbeat timestamp.
        self.heartbeats_lock.acquire()
        self.heartbeats[service_name] = time.time()
        self.heartbeats_lock.release()

        # Returning the success message and the 200 status code.
        return {
            "message" : "Heartbeat received!"
        }, 200

Эта функция вызывается при вызове конечной точки пульса (перечисленной ниже):

@app.route("/heartbeat/<service>", methods=["POST"])
def heartbeat(service):
    '''
        This function processes the heartbeat requests.
    '''
    response, status_code = service_registry.add_heartbeat(service)
    return response, status_code

Когда служба хочет отправить запрос пульса, она должна отправить свое имя в качестве параметра, таким образом будет обновлена ​​временная метка последнего запроса пульса. Наконец, ниже представлен код функции, проверяющей сердцебиение, она работает как отдельный поток и просто печатает имя службы, которую считает неактивной.

def check_heartbeats(self):
    '''
        This function checks the last heartbeats of the registered services every 10 seconds
        and prints the one that didn't send a heartbeat request more than 30 seconds.
    '''
    while True:
        self.heartbeats_lock.acquire()
        # Iterating through services and checking how long a go the service sent the last heartbeat
        # request.
        for service in self.heartbeats:
            if time.time() - self.heartbeats[service] > self.time_threashold:
                print(f"Service - {service} seems to be dead!")
        self.heartbeats_lock.release()
        time.sleep(10)

Эта функция выполняется в отдельном потоке основного модуля.

# Starting up the processing of checking heartbeats.
threading.Thread(target=service_registry.check_heartbeats).start()

# Running the main service.
app.run()

Вывод.

Распределенные системы — это обширная область компьютерных наук и технология, которую нужно знать и понимать. Service Discovery — это специальный сервис в такой системе, который хранит и делает доступной информацию о других сервисах в системе. Кроме того, он поддерживает доступность услуг. Это также служба, с которой следует начинать при разработке распределенной службы, поскольку каждая служба начинает свою деятельность с регистрации в этой службе.

Полный код сервиса с комментариями можно найти здесь: ссылка на GitHub

Автор Păpăluță VasileLinkedIn: https://www. linkedin.com/in/vasile-păpăluță/

Инстаграм: https://www.instagram.com/science_kot/

GitHub: https://github.com/ScienceKot

:::информация Также опубликовано здесь.

:::


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE