@qq-framework/task-manager
TypeScript icon, indicating that this package has built-in type declarations

1.0.0 • Public • Published

Como reprocessar tarefas de forma automática

O processo de reprocessamento é uma funcionalidade que permite a execução de tarefas de forma controlada, com tentativas repetidas em caso de falha. Ele é útil para tarefas que podem falhar temporariamente devido a problemas de rede, serviços externos indisponíveis, entre outros.

Pré-requisitos

Para utilizar o reprocessamento de tarefas, será necessário importar a model Reprocessamento nas entities do seu arquivo ormconfig.ts.

const entities = ['dist/src/modules/**/infra/models/*.model{.ts,.js}', ReprocessamentoModel]

export default {
    type: 'postgres',
    url: process.env.DATABASE_URL,
    entities,
    synchronize: false,
    logging: false,
    migrations: ['dist/src/shared/infra/typeorm/migrations/*.js'],
    cli: {
        migrationsDir: 'src/shared/infra/typeorm/migrations',
    },
}

Como usar

Atualmente há 2 formas de reprocessar tarefas de forma automática, são elas:

  1. Utilizando o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor, ambos utilizam o Kafka para reprocessar as mensagens.

  2. Utilizando a classe ControleProcessamento e a classe ControleReprocessamento, que são classes que permitem reprocessar tarefas de forma automática, sem a necessidade de utilizar o Kafka (mas que também podem implementar envio de eventos).

1. Utilizando o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor

Para utilizar o decorator @RetryableEventPattern e o interceptor @RetryableEventPatternDlqInterceptor, você precisa seguir 2 principais etapas, são elas:

Etapa 1: Criar o Processamento

Inserir o decorator @RetryableEventPattern no método que deseja reprocessar, informando os parâmetros necessários.

    @RetryableEventPattern(
        param1,
        param2,
        param3,
        param4
    )

O decorator RetryableEventPattern recebe quatro argumentos:

  1. Param1 -> topico: O nome do tópico do Kafka que será enviado a mensagem em caso de falha.

  2. Param2 -> quantidadeCiclos: O número máximo de ciclos de reprocessamento. O valor padrão é 1.

  3. Param3 -> quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo. O valor padrão é 3.

  4. Param4 -> processo: O nome do processo à ser executado.

Exemplo de uso
    @RetryableEventPattern(
        KafkaTopics.IMAGEM_PRODUTO_IMPORTADA_FALHA_TOPIC,
        1,
        3,
        'ImportacaoController.atualizarProdutoImagemFalha'
    )
    public async atualizarProdutoImagemFalha(
        @Payload() mensagem: AtualizarStatusImportacaoUseCaseProps,
        @Ctx() context: KafkaContext
    ): ResultAsync<any, void> {
        return await this.atualizarProdutoImagemUseCase.execute(mensagem)
    }

Nesse exemplo, o método atualizarProdutoImagemFalha será reprocessado uma vez, com até 3 tentativas por ciclo, e o processo será chamado ImportacaoController.atualizarProdutoImagemFalha.

OBS: É necessário que seja fornecido o KafkaContext nos parâmetros

Etapa 2: Criar o Reprocessamento

Inserir o interceptor @RetryableEventPatternDlqInterceptor no controller que deseja reprocessar, informando os parâmetros necessários.

    @UseInterceptors(RetryableEventPatternDlqInterceptor)

O interceptor RetryableEventPatternDlqInterceptor não recebe argumentos. Mas, para que o reprocessamento funcione, é necessário que o método reprocessar seja chamado periodicamente, no tempo que achar necessário e que a rota do endpoint seja chamada passando os seguintes parâmetros no body:

{
    "topico": "nome-do-topico",
    "processo": "nome-do-processo",
    "minutosEntreCiclos": 1
}

OBS: O valor padrão para o parâmetro minutosEntreCiclos é 1 minuto.

Exemplo de uso
    @Post('/teste')
    @UseInterceptors(RetryableEventPatternDlqInterceptor)
    public async atualizarProdutoImagemSucesso(@Body() props: PROPS_TYPE): Promise<HttpResponseError | HttpResponseOk> {
        return super.buildResponse({
            result: R.ok(),
        })
    }

Neste exemplo, para reprocessar o processo dado como exemplo anteriormente, a rota /teste será chamada da seguinte forma:

curl --location 'http://IP:PORTA/PATH/MODULO/teste' \
--header 'Content-Type: application/json' \
--data '{
    "topico": "nome-do-topico",
    "processo": "ImportacaoController.atualizarProdutoImagemFalha",
    "minutosEntreCiclos": 1
}'

2. Utilizando a classe ControleProcessamento e a classe ControleReprocessamento

Para utilizar o controle de processamento, você precisa seguir 2 principais etapas, são elas:

Etapa 1: Criar o Processamento

Há duas maneiras de criar um processamento, a primeira é utilizando o método processar e a segunda é utilizando o método processarComCallback. A diferença entre eles é que o método processar irá realizar a função passada como parâmetro novamente, de forma autómatica, o número de vezes padronizado. Enquanto o método processarComCallback permite que você defina um callback personalizado, que será executado quando a função principal falhar. Segue abaixo os exemplos de uso de ambos os métodos:

Método processar

Instanciar a classe ControleProcessamento informando os parâmetros e chamar o método processar.

const controle = new ControleProcessamento(param1, param2)

await controle.processar(param3, param4)

O classe ControleProcessamento recebe dois argumentos:

  1. Param1 -> processo: O nome do processo à ser executado

  2. Param2 -> parametros: Objeto que contém os parâmetros para o processamento.

O método processar recebe dois argumentos:

  1. Param3 -> props: Os dados que serão utilizados para chamar a callback

  2. Param4 -> callback: A função que será executada

Os parâmetros para o processamento são:

  • quantidadeCiclos: O número máximo de ciclos de reprocessamento.
  • quantidadeTentativasPorCiclo: O número máximo de tentativas por ciclo.
  • cicloAtual: O ciclo atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um ciclo específico.
  • numeroTentativa: O número da tentativa atual. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar uma tentativa específica.
  • id: O id do processamento. Este parâmetro é opcional e deve ser fornecido apenas se você deseja reprocessar um processamento específico.

OBS: Ao informar os parâmetros opcionais, tenha em mente que o processo poderá não seguir a ordem padrão de reprocessamento.

Exemplo:

const parametros = {
    quantidadeCiclos: 3,
    quantidadeTentativasPorCiclo: 3,
}
Exemplo de uso

Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:

const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
    quantidadeCiclos: 3,
    quantidadeTentativasPorCiclo: 3,
})

await controle.processar(payload.data, async (processamento: Processamento) => {
    const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
        skuId: processamento.props['idSkuVtex'],
        quantity: processamento.props['totalAvailableEcm'],
        warehouseId: processamento.props['idWarehouse'],
        branchId: processamento.props['idBranch'],
    })

    if (retornoVtex.isFailure()) {
        return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
    }

    return R.ok()
})

OBS: Neste exemplo, a tarefa é atualizar o inventário em um serviço externo. Se a atualização falhar, a tarefa será reprocessada até 3 vezes por ciclo, com até 3 ciclos, e um intervalo de 60 minutos entre os ciclos.

Método processarComCallback

A diferença entre o método processar e o método processarComCallback é que o método processarComCallback permite que você defina um callback personalizado, que será executado quando a função principal falhar. Portanto, a utilização é a mesma, com a diferença de que o método processarComCallback recebe um callback como parâmetro.

Exemplo de uso

Aqui está um exemplo de como usar o processo para atualizar o estoque em um serviço externo:

const controle = new ControleProcessamento(ParallelStockUpdaterServiceImpl.name, {
    quantidadeCiclos: 3,
    quantidadeTentativasPorCiclo: 3,
})

await controle.processar(
    payload.data,
    async (processamento: Processamento) => {
        const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
            skuId: processamento.props['idSkuVtex'],
            quantity: processamento.props['totalAvailableEcm'],
            warehouseId: processamento.props['idWarehouse'],
            branchId: processamento.props['idBranch'],
        })

        if (retornoVtex.isFailure()) {
            return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
        }

        return R.ok()
    },
    async (processamento: Processamento) => {
        // Neste callback, você pode fazer o que quiser, como enviar uma mensagem para um tópico do Kafka
        await this.kafkaProducerService.post({
            conteudo: JSON.stringify({
                correlationId: processamento.id,
                cycles: processamento.cicloAtual,
                retries: processamento.numeroTentativa,
                data: {
                    ...processamento.props,
                },
            }),
            chave: processamento.id,
            topico: KafkaTopics.REPROCESSED_STOCK,
            headers: headers,
        })

        return R.ok()
    }
)

Etapa 2: Criar o Reprocessamento

Quando um Processamento atinge o máximo de tentativas, ele é marcado como PENDENTE e salvo na tabela reprocessamento. Para reprocessar um Processamento marcado como PENDENTE, você deve criar um novo método no controller que sejá chamado periodicamente, no tempo que achar necessário.

OBS: É interessante que não sejam executados mais 1 reprocessamento ao mesmo tempo, pois pode acabar gerando concorência. Para isso, é possível utilizar as configurações do AbstractUseCase, definindo o processo com singleThread

Esse método deve chamar algum processo que instâncie a classe ControleReprocessamento e chamar o método reprocessar,

const controle = new ControleReprocessamento(param1, param2)
await controle.reprocessar(param3)

A classe ControleReprocessamento recebe dois argumentos:

  1. Param1 -> processo: O nome do processo à ser executado

  2. Param2 -> minutosEntreCiclos: O número de minutos entre um ciclo e outro utilizado como filtro na query feita na tabela Reprocessamento, tendo como base o valor da coluna dt_criacao. O valor padrão é 1 minuto.

O método reprocessar recebe uma função de callback que será executada para cada Processamento marcado como PENDENTE.

OBS 1: O método reprocessar irá reprocessar todos os processamentos marcados como PENDENTE que estão dentro do intervalo de tempo definido pelo parâmetro minutosEntreCiclos.

Exemplo de uso

Aqui está um exemplo de como usar o reprocessamento para o mesmo processo de atualização de estoque em um serviço externo:

    @Post('/reprocessar-teste')

    async teste(): Promise<HttpResponseOk | HttpResponseError> {
        const controle = new ControleReprocessamento(
            ParallelStockUpdaterServiceImpl.name,
            1
        )

        await controle.reprocessar(
            async (processamento: Processamento) => {
                const retornoVtex = await this.vtexService.updateInventoryBySkuAndWarehouse({
                    skuId: processamento.props['idSkuVtex'],
                    quantity: processamento.props['totalAvailableEcm'],
                    warehouseId: processamento.props['idWarehouse'],
                    branchId: processamento.props['idBranch'],
                })

                if (retornoVtex.isFailure()) {
                    return R.failure(new ErroProcessamento('Algum erro ocorreu ao atualizar o estoque!'))
                }

                return R.ok()
            }
        )

        return super.buildResponse({
            result: R.ok(`Reprocessamento iniciado!`),
            successStatusCode: 202,
        })
    }

Exceções

Existem três exceções que podem ser lançadas dentro da função de callback e que são tratadas pelo processo de reprocessamento:

  1. ErroProcessamento: Esta exceção deve ser lançada quando ocorrer um erro que pode ser resolvido automaticamente. O processo de reprocessamento irá tentar novamente. Se esta exceção for lançada, o processo será marcado como PENDENTE e será reprocessado conforme o parâmetro minutosEntreCiclos.
  2. ProcessamentoIrrecuperavelException: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o processo será marcado como ERRO_PERMANENTE e não será reprocessado.
  3. CicloInvalidoException: Esta exceção deve ser lançada quando ocorrer um erro que não pode ser resolvido automaticamente neste ciclo. O processo de reprocessamento irá parar e o erro será retornado. Se esta exceção for lançada, o ciclo atual será ignorado E O processo será marcado como PENDENTE e reprocessado conforme o parâmetro minutosEntreCiclos.

Readme

Keywords

none

Package Sidebar

Install

npm i @qq-framework/task-manager

Weekly Downloads

1

Version

1.0.0

License

ISC

Unpacked Size

37 kB

Total Files

15

Last publish

Collaborators

  • carlos.castro.qq
  • guilhermegotin
  • fn_qq
  • juniordeitch
  • 6porto
  • lealhugui