Написание многопоточного кода с помощью библиотеки параллельных задач (TPL)

Написание многопоточного кода с помощью библиотеки параллельных задач (TPL)

9 марта 2023 г.

Я большой поклонник выполнения нескольких потоков в приложении, и мне интересно посмотреть, как быстро параллелизм может решить сложный запрос.

Давайте разберемся с TPL со следующим содержанием:

  • Напишите простой цикл Parallel.For
  • Напишите простой цикл Parallel.ForEach
  • Отменить цикл "Parallel.For" или "Parallel.ForEach"
  • Обработка исключений в параллельных циклах

Цикл "Parallel.For"

Приведенная ниже функция берет имя каталога из командной строки и использует «цикл TPL For» для получения результатов. Давайте рассмотрим пример итерации каталога и выведем количество файлов внутри каталога, а не файлов вложенных папок, вместе с общим размером файла в байтах.

  public static void BasicParallelForLoop()
  {
      long totalSize = 0;
      Console.WriteLine("Enter valid directory path :");
      String args = Console.ReadLine();
      if (!Directory.Exists(args))
      {
          Console.WriteLine("The specified directory does not exist.");
          return;
      }
      String[] files = Directory.GetFiles(args);
      Parallel.For(0, files.Length,
          index =>
          {
              FileInfo fileInfo = new FileInfo(files[index]);
              long size = fileInfo.Length;
              Interlocked.Add(ref totalSize, size);
          });
      Console.WriteLine("Directory '{0}':, {1:N0} files, {2:N0} bytes", args, files.Length, totalSize);
  }

Вывод

В приведенном ниже результате показаны как положительные, так и отрицательные сценарии, т. е. для допустимых и недопустимых имен каталогов.

//Valid directory scenario
Enter valid directory path :
E:gifs
//Result
Directory 'E:gifs':, 7 files, 2,313,294 bytes
//Invalid directory Result
//The specified directory does not exist.

Цикл "Parallel.ForEach"

Цикл ForEach работает точно, как и цикл ForEach; разница заключается в синтаксисе. Давайте решим тот же пример, используя «цикл TPL ForEach».

Пример ссылки

Lets consider an example of iterating a directory and output the following things:
- The number of files inside the directory, not subfolder files though.
- Size in bytes

Давайте напишем код. Обратите внимание, что синтаксис стал более читабельным, так как файловый объект управляется напрямую, а не индексируется.

String[] files = Directory.GetFiles(args);
Parallel.ForEach(files, (currentFile) =>
{
    FileInfo fileInfo = new FileInfo(currentFile);
    long size = fileInfo.Length;
    Interlocked.Add(ref totalSize, size);
});
Console.WriteLine("Directory '{0}':, {1:N0} files, {2:N0} bytes", args, files.Length, totalSize);

Вывод

//Valid directory scenario
Enter valid directory path :
E:gifs
//Result
Directory 'E:gifs':, 7 files, 2,313,294 bytes
//Invalid directory Result
//The specified directory does not exist.

Отменить "Parallel.For" или "Parallel.ForEach"

Теперь, если пользователь хочет в любой момент отменить выполнение параллельного цикла, это можно сделать с помощью токена отмены. Циклы Parallel.For и Parallel.ForEach поддерживают отмену с помощью токенов отмены. Давайте возьмем пример итерации цикла For с целыми числами в диапазоне от 0 до 10000000 и отменим его выполнение между ними, используя ключевое слово "s".

Давайте напишем код.

Используйте объект класса ParallelOptions, чтобы предоставить токен отмены и максимальный параллелизм. Обратите внимание, что в приведенном ниже коде показано «Создание задачи» с использованием «Task.Factory», поэтому мы можем отменить ее из другого потока. В приведенном ниже примере показано, как применить токен отмены в цикле For. Аналогичным образом его можно использовать и в цикле ForEach.

  public static void BasicCancelLoop()
  {
      int[] nums = Enumerable.Range(0, 100000000).ToArray();
      CancellationTokenSource cts = new CancellationTokenSource();
      ParallelOptions parallelOptions = new ParallelOptions();
      parallelOptions.CancellationToken = cts.Token;
      parallelOptions.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
      Console.WriteLine("Press any key to start. Press 's' to cancel.");
      Console.ReadKey();
      Task.Factory.StartNew(() =>
       {
           if (Console.ReadKey().KeyChar == 's')
               cts.Cancel();
           Console.WriteLine("press any key to exit");
       });
      try
      {
          Parallel.ForEach(nums, parallelOptions, (num) =>
          {
              Console.WriteLine("{0} on {1}", num);
              parallelOptions.CancellationToken.ThrowIfCancellationRequested();
          });
      }
      catch (OperationCanceledException e)
      {
          Console.WriteLine(e.Message);
      }
      finally
      {
          cts.Dispose();
      }
  }

Вывод

//print num values
....
....
....
//s key pressed
The operation was cancelled

Обработка исключений в параллельных циклах

Циклы Parallel.For и Parallel.ForEach не имеют механизма обработки исключений. Они напоминают обычные циклы for и foreach, т. е. необработанное исключение приводит к завершению цикла. Чтобы добавить обработку исключений в TPL, обработайте случаи, когда одни и те же исключения могут создаваться одновременно в нескольких потоках. Наконец, заверните все исключения из цикла в исключение System.AggregateException.

Код обработки исключений

  public static void Handle()
  {
      int[] data = new int[3] { 1, 2, 3 };
      try
      {
          ProcessDataInParallel(data);
      }
      catch (AggregateException ex)
      {
          Console.WriteLine(ex.Message);
      }
  }
  private static void ProcessDataInParallel(int[] data)
  {
      Parallel.ForEach(data, d =>
      {
          throw new ArgumentException($"Exception with value : {d}");
      });
  }

Вывод

В выводе будет показано несколько исключений.

One or more errors occurred. (Exception with value 1) (Exception with value 3) (Exception with value 2)

Пример GitHub

https://github.com/ssukhpinder/TaskParallelLibExample

Почему TPL?

Во-первых, TPL не только экономит время на развертывание и оптимизацию решения, но и имеет то преимущество, что позволяет подключаться к внутренним компонентам ThreadPool, которые не доступны публично, что может повысить производительность в различных критически важных задачах. сценарии. Как упоминалось выше, задачи используют преимущества новых очередей кражи работы в ThreadPool, которые могут помочь избежать конфликтов и проблем с когерентностью кэша, ведущих к снижению производительности. Использование Задач приводит к выделению большего количества объектов, а поддержание жизненного цикла Задачи (состояние, запросы на отмену, обработка исключений и т. д.) требует дополнительной работы и синхронизации.


Спасибо, что прочитали. Надеюсь, вам понравилась статья..!!

Также опубликовано здесь


Оригинал
PREVIOUS ARTICLE
NEXT ARTICLE