Известные проблемы с параллелизмом и то, как GO обрабатывает их

Известные проблемы с параллелизмом и то, как GO обрабатывает их

28 июля 2025 г.

В этой статье я проведу вам следующие классические темы в одновременном программировании:

  • Продюсер-потребительскую проблему: Фундаментальный пример многопроцессной синхронизации, включающей общие ресурсы и стратегии буферизации.
  • Проблема столовой философов: Хорошо известная иллюстрация тупика, голода и обмена ресурсами в одновременных системах.
  • Ограничение скорости: Практический метод контроля доступа к ресурсам и поддержания стабильности системы при высокой нагрузке.

Каждая из этих тем дает ценную информацию о разработке надежных и эффективных параллельных систем. Давайте погрузимся.

Продюсер-потребительскую проблему

Представьте себе небольшую пекарню, которая делает кексы. На кухне есть люди, которые выпекают кексы, и на передней стойке есть клерки (потребители), которые обслуживают их клиентам. Но есть поворот: кексы не могут идти прямо от духовки к клиенту, которого они сначала должны быть помещены на небольшой поднос с ограниченным пространством. Если поднос заполняется, пекари должны подождать. Если поднос пуст, клерки должны ждать.

Эта, казалось бы, простая ситуация моделирует очень распространенную проблему в информатике: проблема-производителя. По сути, общее пространство - очередь или буфер, который соединяет два независимых процесса: один, который генерирует данные, и другой, который его потребляет. В программировании это особенно важно при работе с параллелизмом, где происходит несколько вещей одновременно.

Давайте представим этот сценарий, используя GO. Мы имитируем кучу пекарей, которые создают случайные «кексы» и команду клерков, которые «обслуживают» их. Во -первых, мы создадим простую версию, а затем улучшат ее, чтобы она мог изящно отключиться.

Шаг 1: Основной производитель

Каждый производитель будет случайным образом создавать число, немного спать и отправлять номер через канал. Вот как мы могли бы написать:

func cupcakeBaker(id int, stop <-chan struct{}, tray chan<- int) {
	for {
		select {
		case <-stop:
			return
		default:
			cupcake := rand.Intn(100)
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
			select {
			case tray <- cupcake:
				fmt.Printf("👨‍🍳 Baker %d baked cupcake %d\n", id, cupcake)
			case <-stop:
				return
			}
		}
	}
}

Здесь лоток - это канал, который действует как общий буфер. Канал остановки позволяет нам сигнализировать всем пекарям прекратить работу.

Шаг 2: Основной потребитель

Теперь давайте определим, как работают клерки. Они просто стоят на стойке и берут кекс с подноса, когда он доступен.

func counterClerk(id int, tray <-chan int) {
	for cupcake := range tray {
		fmt.Printf("🧍 Clerk %d served cupcake %d\n", id, cupcake)
	}
}

Цикл диапазона автоматически закончится, когда канал лотка будет закрыт, сигнализируя о том, что больше нет кексов.

Шаг 3: Собирайте все это вместе

Давайте подключим все это в основной функции и запустим нашу операцию кекса:

func main() {
	tray := make(chan int, 5)           // A tray that holds up to 5 cupcakes
	stop := make(chan struct{})         // Signal to stop production

	for i := 0; i < 5; i++ {
		go cupcakeBaker(i, stop, tray)
	}
	for i := 0; i < 5; i++ {
		go counterClerk(i, tray)
	}

	time.Sleep(10 * time.Second)
	close(stop)
}

На этом этапе производители перестанут создавать новые предметы, но потребители все еще могут работать в ожидании канала. Это небезопасно и может привести к висящей программе. Давайте улучшим это.

Изящное отключение: плавно закрывать пекарню закрыться

Давайте теперь улучшим нашу пекарню для кексов, чтобы она была должным образом отключена, не оставляя работников. Для этого нам нужен способ:

  1. Пусть все пекари знают, когда остановиться.
  2. Подождите, пока они все закончат.
  3. Скажите клеркам, что больше нет кексов.
  4. Подождите, пока все клерки не закончат, подавая оставшиеся кексы.

Чтобы координировать это изящное отключение, мы будем использовать Sync.WaitGroup полезный инструмент, который ожидает коллекции Goroutines для завершения.

Мы обновляем нашу функцию Baker, чтобы принять группу ожидания. Таким образом, основная рутина знает, когда каждый пекарь закончил свою работу.

func cupcakeBaker(id int, wg *sync.WaitGroup, stop <-chan struct{}, tray chan<- int) {
	defer wg.Done()

	for {
		select {
		case <-stop:
			return
		default:
			cupcake := rand.Intn(100)
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))

			select {
			case tray <- cupcake:
				fmt.Printf("👨‍🍳 Baker %d baked cupcake %d\n", id, cupcake)
			case <-stop:
				return
			}
		}
	}
}

Как и пекари, клерки теперь также используют группу ожидания. Как только все кексы подаются и поднос пуст (то есть канал закрыт), они завершают свою работу и завершение сигнала.

func counterClerk(id int, wg *sync.WaitGroup, tray <-chan int) {
	defer wg.Done()

	for cupcake := range tray {
		fmt.Printf("🧍 Clerk %d served cupcake %d\n", id, cupcake)
	}
}

Теперь все собирается вместе. Мы установили поднос и сигнал остановки, запускаем работников, позволяем им запустить некоторое время, а затем начинаем процесс выключения:

func main() {
	tray := make(chan int, 5)
	stop := make(chan struct{})
	bakerGroup := &sync.WaitGroup{}
	clerkGroup := &sync.WaitGroup{}

	for i := 0; i < 5; i++ {
		bakerGroup.Add(1)
		go cupcakeBaker(i, bakerGroup, stop, tray)
	}

	for i := 0; i < 5; i++ {
		clerkGroup.Add(1)
		go counterClerk(i, clerkGroup, tray)
	}

	time.Sleep(10 * time.Second)
	close(stop)
	bakerGroup.Wait()
	close(tray)
	clerkGroup.Wait()
}

С этой версией, через 10 секунд:

  • Пекаторы перестают печать кексы.
  • Клерки продолжают подавать, пока все оставшиеся кексы не будут взяты из подноса.
  • И наконец все уходят домой. Чисто, эффективно, и никто не застрял, работая поздно.

Проблема столовой философов

Представьте себе пять философов, сидящих за круглым столом. Они глубокие мыслители, но они также любят спагетти. Перед каждым философом находится тарелка, и между каждой парой пластин находится в общей сложности один вил. Правило: есть, философ нуждается в вилке слева и вилкой справа. Мышление может быть сделано в любое время, но еда требует сотрудничества.

Давайте превратим это в программу GO.

Шаг 1: Философы как горутины

Каждый философ представлен как goroutine. Вилки смоделированы как блокировки мутекс, которые представляют эксклюзивный доступ к вилкам.

Когда философ хочет съесть:

  1. Они блокируют первую вилку (скажем, эта слева).
  2. Затем они пытаются заблокировать вторую вилку (правая).
  3. Если они оба отлично! Они едят.
  4. После еды они разблокируют оба вилки и возвращаются к мышлению.

Вот как выглядит один философ в коде:

func philosopher(index int, firstFork, secondFork *sync.Mutex) {
	for {
		fmt.Printf("Philosopher %d is thinking\n", index)
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))

		firstFork.Lock()
		secondFork.Lock()

		fmt.Printf("Philosopher %d is eating\n", index)
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))

		secondFork.Unlock()
		firstFork.Unlock()
	}
}

Шаг 2: Одна вилка между каждым философом

Теперь давайте посадим наших пять философов и выпустим пять вилок:

func main() {
	forks := [5]sync.Mutex{}
	go philosopher(0, &forks[4], &forks[0])
	go philosopher(1, &forks[0], &forks[1])
	go philosopher(2, &forks[1], &forks[2])
	go philosopher(3, &forks[2], &forks[3])
	go philosopher(4, &forks[3], &forks[4])
	select {}
}

Каждый философ пытается схватить их левую и правую вилки, но обратите внимание на что -нибудь подозрительное?!

Ловушка тупика

Эта установка выглядит симметрично, но она опасна.

Представьте себе:

  • Каждый философ поднимает левую вилку.
  • Теперь каждый из них ждет правой форки, которая уже удерживается их соседом.

Никто не может двигаться вперед. Все застряли. Это тупик.

С точки зрения одновременности, эта ситуация удовлетворяет всем условиям Коффмана для тупика:

  1. Взаимное исключение: каждая вилка удерживается только одним философом.
  2. Держите и подождите: каждый держит одну форк и ждет другой.
  3. Никакого вытеснения: вилки не насильно забраны.
  4. Круговое ожидание: все ждут в круге.

Выйти из тупика

Как мы можем предотвратить это? Один простой трюк: измените порядок, в котором только один философ поднимает вилки.

Давайте отменим приказ для первого философа:

func main() {
	forks := [5]sync.Mutex{}
	go philosopher(0, &forks[0], &forks[4]) // reversed order
	go philosopher(1, &forks[0], &forks[1])
	go philosopher(2, &forks[1], &forks[2])
	go philosopher(3, &forks[2], &forks[3])
	go philosopher(4, &forks[3], &forks[4])
	select {}
}

Теперь нет циркулярного ожидания. Один перерыв в цепи достаточно, чтобы полностью избежать тупика.

Трилок подход

Некоторые библиотеки параллелизма предлагают метод Trylock (). Он пытается заблокировать вилку, и если она уже взята, она сдается. Давайте воспользуемся:

func philosopher(index int, leftFork, rightFork *sync.Mutex) {
	for {
		fmt.Printf("Philosopher %d is thinking\n", index)
		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))

		leftFork.Lock()
		if rightFork.TryLock() {
			fmt.Printf("Philosopher %d is eating\n", index)
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
			rightFork.Unlock()
		}
		leftFork.Unlock()
	}
}

Эта версия предотвращает тупики, потому что никто не застрял, держа одну вилку навсегда. Но есть недостаток: философы могут голодать. Они продолжают пытаться и не могут есть, бесполезно вращаясь. Это без тупика, но не справедливо.

Каналы для спасения

Давайте вообще отбросим мутекс и используем каналы, чтобы представлять вилки. Каждая вилка теперь является каналом емкости 1. Если канал имеет значение, вилка доступна. Если он пуст, вилка используется.

Вот как выглядит этот подход:

func philosopher(index int, leftFork, rightFork chan bool) {
	for {
		fmt.Printf("Philosopher %d is thinking\n", index)
		time.Sleep(time.Duration(rand.Intn(1000)))

		select {
		case <-leftFork:
			select {
			case <-rightFork:
				fmt.Printf("Philosopher %d is eating\n", index)
				time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
				rightFork <- true
			default:
			}
			leftFork <- true
		}
	}
}

И установка:

func main() {
	var forks [5]chan bool
	for i := range forks {
		forks[i] = make(chan bool, 1)
		forks[i] <- true
	}
	go philosopher(0, forks[4], forks[0])
	go philosopher(1, forks[0], forks[1])
	go philosopher(2, forks[1], forks[2])
	go philosopher(3, forks[2], forks[3])
	go philosopher(4, forks[3], forks[4])
	select {}
}

Эта версия имитирует поведение Trylock, используя не блокирующие операторы. Если философ не может получить оба вилка, они поставили ту, которую они взяли, и возвращаются к мышлению.

Ограничение скорости

Представьте, что вы управляете входом в концертный зал. Люди (запросы) появляются в случайное время, стремясь войти внутрь. Но вы не можете позволить им все сразу же ограничить, сколько людей можно безопасно допустить в секунду. Таким образом, вы разработаете систему: каждый человек нуждается в билете (токен), а билеты печатаются в стабильном темпе, скажем, два в секунду.

Вы храните эти билеты в небольшом ведре. Если кто -то появляется, и в ведре есть билет, он берет один и входит. Если ведро пустое, они должны ждать, пока следующее будет напечатано.

Добро пожаловать в алгоритм ковша. Представьте себе это:

  • Производитель добавляет билеты в ведро с фиксированной скоростью.
  • Потребитель принимает билеты, чтобы войти.
  • Если билет нет, они должны ждать.

Давайте начнем кодировать это в Go с каналами.

Шаг 1: Определение структуры ковша токена

Мы будем использовать канал для представления ведра. Токены - это просто пустые значения (struct {}), а goroutine с тикером будет генерировать их с фиксированной скоростью.

type ChannelRate struct {
	bucket chan struct{}
	ticker *time.Ticker
	done   chan struct{}
}

Здесь ведро держит токены, тикер выпускает токен производство токенов, и сделано, позволяет нам закрыть все.

Шаг 2: строительство ограничителя ставок

Мы напишем конструктор, чтобы создать ограничитель, заполнить ведро и начать производить токены.

func NewChannelRate(rate float64, limit int) *ChannelRate {
	ret := &ChannelRate{
		bucket: make(chan struct{}, limit),
		ticker: time.NewTicker(time.Duration(1/rate * float64(time.Second))),
		done:   make(chan struct{}),
	}

	for i := 0; i < limit; i++ {
		ret.bucket <- struct{}{}
	}

	go func() {
		for {
			select {
			case <-ret.done:
				return
			case <-ret.ticker.C:
				select {
				case ret.bucket <- struct{}{}:
				default:
				}
			}
		}
	}()

	return ret
}

Токены периодически добавляются, но только если ведро не заполнено. Это важно, что мы не хотим переполнять это.

Шаг 3: В ожидании токена

Когда вступает запрос, мы называем Wait (). Он просто ждет, пока не будет доступен токен:

func (s *ChannelRate) Wait() {
	<-s.bucket
}

И очистить:

func (s *ChannelRate) Close() {
	close(s.done)
	s.ticker.Stop()
}

Взрываться против устойчивого потока

Допустим, у вас есть разрыв из 4 запросов, они все пройдут, если ведро будет заполнено. После этого новые токены прибывают каждые 500 мс (скорость = 2/с). Так что пятый запрос должен ждать. Таким образом, всплески разрешены, но средняя ставка все еще соблюдается.

Теперь вот введите поворот: что, если мы не хотим появиться в горутине на ограничитель?

Мы можем рассчитать токены на лету, а не периодически добавлять их. Мы храним:

  • ntokens: Сколько токенов в ведре
  • LastToken: Когда был получен последний токен

Вот структура:

type Limiter struct {
	mu         sync.Mutex
	rate       int
	bucketSize int
	nTokens    int
	lastToken  time.Time
}

Инициализация проста:

func NewLimiter(rate, limit int) *Limiter {
	return &Limiter{
		rate:       rate,
		bucketSize: limit,
		nTokens:    limit,
		lastToken:  time.Now(),
	}
}

Метод ожидания: не нужно goroutines

Этот метод обрабатывает все: проверка жетонов, расчет новых и ожидание при необходимости.

func (s *Limiter) Wait() {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.nTokens > 0 {
		s.nTokens--
		return
	}

	tElapsed := time.Since(s.lastToken)
	period := time.Second / time.Duration(s.rate)
	nTokens := int(tElapsed.Nanoseconds() / period.Nanoseconds())

	s.nTokens = nTokens
	if s.nTokens > s.bucketSize {
		s.nTokens = s.bucketSize
	}
	s.lastToken = s.lastToken.Add(time.Duration(nTokens) * period)

	if s.nTokens > 0 {
		s.nTokens--
		return
	}

	next := s.lastToken.Add(period)
	wait := next.Sub(time.Now())
	if wait >= 0 {
		time.Sleep(wait)
	}
	s.lastToken = next
}

Когда прибывает запрос, ограничитель сначала проверяет, есть ли какие -либо токены. Если есть, это просто потребляет один и позволяет запросу выполнить без промедления. Но если ведро пустое, все становится немного интереснее. Вместо того, чтобы блокировать неопределенное время или полагаться на фоновый процесс, ограничитель рассчитывает, сколько времени прошло с момента получения последнего токена. Используя это прошедшее время, он определяет, сколько новых жетонов должно было быть добавлено в то же время. Если достаточное количество токенов практически «доступно» в зависимости от времени, он обновляет свои внутренние счетчики и продолжается. Если нет, ограничитель ждет достаточно долго, пока не будет произведен следующий токен, а затем не будет продолжаться. Прелесть этого подхода в том, что он полностью автономный: на заднем плане нет дополнительных ружений или тикеров. Все обрабатывается в одной ветке с тщательной логикой на основе времени.

Версия ограничителя тарифов на основе канала прекрасно работает для простых приложений или внутренних услуг, разделенных среди небольшого числа пользователей. Это просто и легко рассуждать. Однако, когда строительные системы, которые обслуживают большое количество одновременных пользователей, таких как общедоступные API или мультитенантные платформы, ограничен на основе Mutex становится лучшим выбором. Это избегает нереста дополнительных goroutines и сохраняет память, что делает ее более эффективной и масштабируемой.

Go даже имеет встроенную библиотеку для этого:golang.org/x/time/rateПолем Для производственных систем это настоятельно рекомендуется.


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE