 
                        
                    Загрузка CSV-файла с 1 миллионом строк на серверную часть за 10 секунд
24 марта 2023 г.
Вставка больших записей из электронной таблицы (файл CSV) в базу данных является очень распространенным и трудная инженерная задача для решения по многим причинам. Современный стек данных ETL инструменты могут решить проблему.
А если?
- Вы специалист по javascript (NodeJS).
- Вы не хотите вкладывать деньги в инструмент ETL.
- Вы не специалист по данным, который знает, как создавать и поддерживать конвейеры данных.
Node.js — это мощный и гибкий механизм для чтения и записи данных в потоковая мода. Потоки обеспечивают эффективный способ обработки больших объемов данных, обрабатывая их небольшими порциями, а не загружая все сразу в память.
Поэтому мы собираемся решить эту проблему с помощью Node Stream.
Как нам поможет NodeJS Stream!!
В Node.js поток — это абстрактный интерфейс, представляющий последовательность данных. Поток можно рассматривать как поток данных, разделенный на фрагменты, и эти фрагменты можно обрабатывать постепенно по мере их доступности.
Потоки можно использовать для чтения или записи данных из различных источников, таких как файлы, сетевые сокеты или даже структуры данных в памяти. Потоки в Node.js реализуются с помощью генераторов событий, что означает, что их можно использовать асинхронно и неблокирующим образом.
Используется ли текущая архитектура для решения этой проблемы?
Шаг 1
Зона перетаскивания создается для предоставления места для перетаскивания CSV-файла.
const onDrop = useCallback((acceptedFiles) => {
    setFiles((prevFiles) => [...prevFiles, ...acceptedFiles]);
  }, []);
  const { getRootProps, getInputProps, isDragActive } = useDropzone({ onDrop });
return(
    <>
        <div
        className={`px-6 pt-5 pb-6 border-2 border-dashed rounded-lg ${
          isDragActive ? "border-green-400" : "border-gray-400"
        }`}
        {...getRootProps()}
      >
        <input {...getInputProps()} />
        <div className="flex flex-col items-center justify-center        text-center space-y-1">
          <PlusIcon className="w-8 h-8 text-gray-400" />
          <p className="text-sm font-medium text-gray-400">
            Drop files here or click to upload
          </p>
        </div>
      </div>
      {files.map((file) => (
        <p
          key={file.name}
          className="mt-2 text-sm font-medium text-gray-500 truncate"
        >
          {file.name} ({file.size} bytes)
        </p>
      ))}
      </>)
Шаг 2
Создайте rest api< /a> для запуска потоковой передачи.
 busboy.on(
            'file',
            async function (fieldname, file, filename, encoding, mimetype) {
              console.log('The file details are', filename, encoding, mimetype);
            }
          pipeline(stream1, stream2)
            )
Здесь мы использовали библиотеку Busboy для прямой потоковой передачи данных вместо их копирования на сервер и последующего перемещения. в MongoDB.
Функция конвейера из потока js узла будет использоваться для предоставления потока для синтаксического анализа, затем преобразования и вставки в Монгодб.
     pipeline(
                file,
                openCsvInputStream,
                headers_changes,
                dbClient.stream,
                (err) => {
                  if (err) {
                    console.log('Pipeline failed with an error:', err);
                  } else {
                    console.log('Pipeline ended successfully');
                  }
                }
              );
Шаг 3
Здесь поток papaparse является первым этапом конвейера. Papaparse — это высокоскоростная библиотека синтаксического анализа для преобразования огромных CSV-файлов в формат JSON. Но нам нужен поток огромного количества данных csv, которые будут переданы на следующие этапы преобразования.
Следующий код создает читаемый поток из входного потока papaparse:
const openCsvInputStream = (fileInputStream) => {
  const csvInputStream = new Readable({ objectMode: true });
  csvInputStream._read = () => {};
  Papa.parse(fileInputStream, {
    header: true,
    dynamicTyping: true,
    skipEmptyLines: true,
    step: (results) => {
      csvInputStream.push(results.data);
    },
    complete: () => {
      csvInputStream.push(null);
    },
    error: (err) => {
      csvInputStream.emit('error', err);
    },
  });
  return csvInputStream;
};
Шаг 4
Именно здесь на сцену выходят потоки трансформации.
    var headers_changes = new Transform({
          readableObjectMode: true,
          writableObjectMode: true,
        });
        headers_changes._transform = async function (data, enc, cb) {
          var newdata = await changeHeader({
            oldColumns,
           newColumns,
          });
          headers_changes.push(newdata);
          cb();
        };
Это преобразование изменения заголовка изменяет старые имена заголовков столбцов на новые имена заголовков. Загвоздка в том, что это поток-трансформер. После преобразования данных они готовы к вставке.
Шаг 5
Наступает последний этап вставки в Mongo DB. Но нам нужна пакетная вставка в MongoDB. Это называется массовой вставкой.
Здесь мы должны создать пакет записей и, когда пакет будет заполнен, вставить в mongodb.
Очистите пакет, чтобы освободить место для новых записей.
  async addToBatch(record) {
    try {
      this.batch.push(record);
      if (this.batch.length === this.config.batchSize) {
        await this.insertToMongo(this.batch);
      }
    } catch (error) {
      console.log(error);
    }
  }
 const writable = new Writable({
      objectMode: true,
      write: async (record, encoding, next) => {
        try {
          if (this.dbConnection) {
            await this.addToBatch(record);
            next();
          } else {
            this.dbConnection = await this.connect();
            await this.addToBatch(record);
            next();
          }
        } catch (error) {
          console.log(error);
        }
      },
    });
Чего мы достигли?
Поток — это мощная функция NodeJ. Даже если у вас есть ноутбук с 8 ГБ оперативной памяти, вы можете использовать его для анализа большого CSV-файла и его потоковой передачи в MongoDB с минимальным использованием ресурсов ЦП и ресурсов. БАРАН. Но может ли он выполнять несколько запросов параллельно? Дождитесь нашей следующей записи!
:::информация Примечание. Это тестирование проводится на компьютере MAC M1 с 8 ГБ ОЗУ.
:::
Оригинал
