Как обработать очередь SQS с помощью функции lambda (не через запланированные события)?


вот упрощенная схема, которую я пытаюсь сделать работу:

http-запросы -- > (API шлюза + лямбда A) --> SQS --> (лямбда B ?????) -- > DynamoDB

поэтому он должен работать так, как показано: данные поступают из многих http-запросов (до 500 в секунду, например) помещается в очередь SQS моей лямбда-функцией A. Затем другая функция, B, обрабатывает очередь: читает до 10 элементов (на некоторой периодической основе) и записывает их в DynamoDB с помощью Batchwriteitem: выполняет.

проблема в том, что я не могу понять, как вызвать вторую лямбда-функцию. Он должен вызываться часто, несколько раз в секунду (или по крайней мере один раз в секунду), потому что мне нужны все данные из очереди, чтобы попасть в DynamoDB как можно скорее (поэтому вызов лямбда-функции B через запланированные события, как описано здесь не вариант)


почему я не хочу писать непосредственно в DynamoDB, без ЗК?

это было бы здорово для меня, чтобы избежать использования SQS вообще. Проблема, которую я пытаюсь решить с помощью SQS, - это дросселирование DynamoDB. Даже не само регулирование, а то, как оно обрабатывается при записи данных в DynamoDB с помощью AWS SDK: при записи записей по одному и их дросселировании AWS SDK молча повторяет запись, что приводит к увеличению времени обработки запроса с точки зрения клиента http.

поэтому я хотел бы временно сохраните данные в очереди, отправьте ответ" 200 OK " обратно клиенту, а затем получите очередь, обработанную отдельной функцией, запись нескольких записей с помощью одного вызова DynamoDB BatchWriteItem (который возвращает необработанные элементы вместо автоматического повтора, в случае дросселирования). я бы даже предпочел потерять некоторые записи вместо увеличения задержки между получением и сохранением записи в DynamoDB

UPD: если кто-то заинтересован, я нашел как заставить aws-sdk пропустить автоматические повторы при дросселировании: есть специальный параметр maxRetries. В любом случае, будем использовать Kinesis, как предлагается ниже

7   51   2016-01-08 16:52:15

7 ответов:

[Это напрямую не отвечает на ваш явный вопрос, поэтому, по моему опыту, он будет понижен :) однако я отвечу на фундаментальную проблему, которую вы пытаетесь решить.]

способ, которым мы принимаем поток входящих запросов и передаем их в функции AWS Lambda для записи в темпе DynamoDB, заключается в замене SQS в предлагаемой архитектуре потоками Amazon Kinesis.

потоки Kinesis могут управлять функциями AWS Lambda.

Кинезис потоков гарантируйте заказ доставленных сообщений для любого заданного ключа (хорошо для упорядоченных операций с базой данных).

потоки Kinesis позволяют указать, сколько функций AWS Lambda может выполняться параллельно (по одной на раздел), которые могут быть согласованы с вашей емкостью записи DynamoDB.

потоки Kinesis могут передавать несколько доступных сообщений в одном вызове функции AWS Lambda, что позволяет проводить дальнейшую оптимизацию.

Примечание: это действительно сервис AWS Lambda, который читает затем из Amazon Kinesis streams вызывается функция, а не потоки Kinesis, напрямую вызывающие AWS Lambda; но иногда проще визуализировать, как Kinesis управляет ею. Результат для пользователя почти такой же.

вы не можете сделать это напрямую интегрируя SQS и лямбда, к сожалению. Но пока не волнуйтесь слишком сильно. Есть решение! Вам нужно добавить еще один сервис amazon в микс, и все ваши проблемы будут решены.

http requests --> (Gateway API + lambda A) --> SQS + SNS --> lambda B --> DynamoDB

вы можете вызвать уведомление SNS для второго лямбда-сервиса, чтобы запустить его. После запуска он может слить очередь и записать все результаты в DynamoDB. Чтобы лучше понять возможные источники событий для лямбда проверить эти документы.

с 28 июня 2018 года вы можете использовать SQS для запуска функций AWS Lambda изначально. Обходные пути больше не нужны!

https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/

другим решением было бы просто добавить элемент в SQS, вызвать целевую лямбда-функцию с событием, чтобы она была асинхронной.

асинхронная лямбда может затем получить от SQS столько элементов, сколько вы хотите, и обработать их.

Я бы также добавил запланированный вызов асинхронной лямбды для обработки любых элементов в очереди, которые были в ошибке.

[UPDATE] теперь вы можете настроить лямбда-триггер на новое сообщение в очереди

может быть, более экономичным решением было бы сохранить все в SQS (как есть), а затем запустить запланированное событие, которое вызывает многопоточную лямбда-функцию, которая обрабатывает элементы из очереди?

таким образом, ваш работник очереди может точно соответствовать вашим ограничениям. Если очередь пуста, функция может закончить преждевременно или начать опрос в одном потоке.

Kinesis звучит как чрезмерное убийство для этого случая – вам не нужен исходный порядок, например. Плюс запуск нескольких лямбд одновременно, безусловно, дороже, чем запуск только одного многопоточного лямбда.

ваша лямбда будет связана с вводом-выводом, внешними вызовами сервисов AWS, поэтому одна функция может очень хорошо подходить.

вот мое решение этой проблемы:

HTTP request --> DynamoDb --> Stream --> Lambda Function

в этом решении вы должны настроить поток для таблицы. Поток обрабатывается с помощью лямбда-функции, которую вы будете писать, и все. Нет необходимости использовать SQS или что-нибудь еще.

конечно, это упрощенная конструкция и она работает только для простых задач. Для более сложных сценариев используйте Kinesis (как указано в других ответах).

здесь ссылка на документацию AWS по тема.

Я считаю, что AWS теперь придумал способ, когда SQS может вызвать функцию лямбда. Поэтому я думаю, что мы можем использовать SQS для сглаживания пакетных нагрузок данных для динамо-машины, если вас не волнует порядок сообщений. Проверьте свой блог на этом новом обновлении: https://aws.amazon.com/blogs/aws/aws-lambda-adds-amazon-simple-queue-service-to-supported-event-sources/