Kotlin Coroutines — Uma abordagem diferente para programação concorrente
Artigo atualizado em 29/07/2023 para correção e inclusão de detalhes sobre Simetria e Assimetria de Coroutines.
Artigo atualizado em 17/06/2024 para maior esclarecimento entre Stackful e Stackless, Simétricas e Assimétricas.
Programação assíncrona, concorrente e “não-bloqueante” é um tópico extenso com inúmeras abordagens e formas diferentes de se trabalhar e que entrega um grande poder, e também uma grande responsabilidade.
Com a programação concorrente, podemos compartilhar os mesmos recursos com diversas tarefas, e, se utilizada de maneira correta, melhorar substancialmente a performance das nossas aplicações
Hoje vamos conhecer as Coroutines do Kotlin, um recurso muito poderoso para programação concorrente, disponível nativamente na linguagem.
Nota: nesse artigo, estaremos falando mais precisamente das construções fornecidas pela biblioteca Kotlinx Coroutines, que é uma extensão das Coroutines nativas disponíveis na linguagem Kotlin.
O que é programação concorrente, paralela, assíncrono e não-bloqueante?
Cada um desses conceitos tem suas peculiaridades, e, apesar de muitas vezes alguns termos serem usados de forma intercambiável, são diferentes em sua essência.
Programação concorrente
Define um conceito onde diferentes tarefas estão ativas ao mesmo tempo e, muitas vezes compartilhando os mesmos recursos, mas não necessariamente executando ao mesmo tempo.
Na linguagem Java, percebemos isso muitas vezes quando alguma operação lança um erro de ConcurrentModificationException, mesmo quando estamos trabalhando com código single-thread.
Por exemplo, veja o código abaixo:
Esse código irá falhar com ConcurrentModificationException mesmo sendo um código sequencial e single-thread, isso pois ocorre uma concorrência entre a tarefa de leitura e a tarefa de escrita. Esse cenário pode ser resolvido bloqueando a escrita quando houver alguma leitura ocorrendo, ou utilizando os meios corretos para modificar uma lista enquanto percorre a mesma, como ao usar um Iterator<T>
.
Apesar do exemplo ser curto, a modificação poderia ocorrer em uma lógica externa que esteja concorrendo pela utilização desse recurso.
Outro bom exemplo seria em um Event-Loop, onde uma tarefa pode devolver o controle da execução quando uma operação de I/O ocorrer. Imagine duas tarefas: Tarefa A e a Tarefa B. Primeiro a Tarefa A inicia uma conversa com o banco para pedir um usuário, mas já que isso pode levar um tempo ela devolve o controle para o agendador (Event-Loop). Agora, com o controle, o agendador inicia a Tarefa B, que por sua vez faz uma inserção de um usuário no banco, e como isso também pode demorar, ela devolve o controle para o agendador (Event-Loop). Note que as duas tarefas estão ativas, elas só continuam após o banco responder, e nesse caso, o banco está sofrendo pela concorrência da leitura e da escrita. Por isso esse código é concorrente, as tarefas concorrem por um recurso, mesmo executando em momentos distintos.
Nas coleções do Java a mesma coisa pode ocorrer, duas tarefas independentes podem concorrer pelo acesso e leitura dos dados.
Normalmente, concorrer pelo acesso não tem problema, mas quando ocorre uma escrita por uma das tarefas, uma delas pode falhar ao tentar ler algo que não existe mais, mas que existia quando ela começou a ler. Por isso coleções regulares falham quando isso ocorre, e ai implementações que suportam concorrência precisam ser utilizadas, e elas tem seu custo.
Programação assíncrona
Uma tarefa assíncrona é uma tarefa que executa em algum momento no futuro, ou seja, é uma tarefa que pode rodar em background enquanto o fluxo regular da aplicação continua.
Porém, assincronismo não implica em concorrência ou paralelismo, uma tarefa assíncrona pode ser executada exatamente no momento que é requisitada ou depois, na mesma Thread ou em outra Thread, de forma concorrente ou não, paralelamente ou não.
Uma tarefa assincrona não implica em concorrência nos termos gerais, pois tarefas assíncronas concorrem de várias formas, principalmente pelo tempo de processamento.
A forma e o momento em que uma tarefa assíncrona é executada depende da implementação utilizada para delegar essa tarefa, bem como de outros fatores, internos e externos.
De forma resumida, uma tarefa assíncrona é uma tarefa que pode ser executada em qualquer momento, normalmente seu resultado pode ser utilizado no futuro, seja pelo mesmo código que pediu sua execução, ou por outro código anexado, ou seu resultado pode ser simplesmente ignorado.
CompletableFutures do Java é um ótimo exemplo, você pode usar o método get
para esperar a tarefa completar (indefinidamente ou por um certo tempo), ou utilizar as funções run
, handle
, then
ou when
para anexar um código responsável por tratar os resultados, ou simplesmente nunca usar o resultado da execução.
Paralelismo
É o conceito de duas tarefas executarem exatamente ao mesmo tempo, algo apenas possível em processadores com mais de um núcleo.
Quando uma tarefa executa em paralelo, ela está executando paralelamente a outra tarefa, isso normalmente se restringe a tarefas lançadas e no controle do software em questão.
Normalmente nunca falamos que uma tarefa roda em paralelo pois outro processo de outro programa fora do nosso controle está rodando uma outra tarefa ao mesmo tempo, mesmo que isso seja verdade, não faz sentido para os termos gerais de paralelismo. Porém, assumir isso pode fazer questão em sistemas que são multi-processos e controlam ou se comunicam com outros processos.
Por meio do paralelismo, é possível executar duas ou mais lógicas intensivas ao mesmo tempo, e ter seu resultado no tempo que apenas uma levaria.
Mas é importante ressaltar que a tarefa intensiva em questão precisa ocupar a CPU o tempo todo, caso contrário, até mesmo ambientes single-core podem ser capazes de terminar duas tarefas no tempo de uma, e é assim que o Node, por exemplo, é capaz de manter a alta performance sendo um ambiente single-thread.
Você pode entender melhor sobre isso nesse artigo:
Programação não-bloqueante
Esse é um conceito bem simples, a ideia é que nenhum código possa bloquear a execução de outras tarefas.
No Java, isso pode ser feito ao utilizar a programação reativa, visto que, ao utilizar Threads ou Thread Pools, quando uma tarefa bloquear, a Thread em especifico ficará bloqueada e ninguém mais poderá aproveitar dela para continuar a execução.
Ao utilizar Thread Pools, se, por exemplo, houverem 4 Threads fixas nesse Pool, se 4 tarefas bloquearem, nenhuma outra poderá executar, e provavelmente serão colocadas em uma fila (Queue) para serem processadas quando alguma Thread liberar.
Ao utilizar programação reativa, ou Coroutines, tarefas que antes bloqueavam, podem utilizar construções que as permitem entregar o controle da execução e retormar essa execução quando o recurso estiver pronto.
Em outra palavras, operações de I/O, por exemplo, podem ser delegadas para o gerenciador responsável, junto a um callback que será chamado quando a execução terminar, e a partir dai, esse código devolve o controle para a Runtime, que pode agora agendar outra tarefa para ser executada. Quando a operação de I/O terminar, a callback informada será chamada e a execução resumida.
Tarefas de I/O podem ser escaladas para um Pool com uma quantidade generosa de Threads, pois essas Threads não estarão usando tempo de processamento, outra alternativa é utilizar as APIs de I/O assíncrono.
Com a programação não-bloqueante, tarefas que bloqueiariam a execução, desperdiçando tempo de processamento, não o fazem mais, e permite que outras tarefas, que irão gastar o tempo de CPU fazendo algo útil, o façam sem interferir na capacidade das demais tarefas de esperar por algum recurso.
O que são Coroutines?
E então, chegamos nas Coroutines.
Coroutines são componentes da programação que permitem a suspensão e o resumo da execução de uma sub rotina. Em outras palavras, permitem que a execução de uma função “pare” em um ponto do código e continue desse mesmo ponto quando forem requisitadas para o fazer.
Stackful vs Stackless
Quando uma Co-rotina é suspensa, ela precisa saber onde resumir e qual o estado anterior dela.
Coroutines Stackful armazenam essa informação na Stack, muito similar a uma função regular. Alternativamente, podem armazenar também em uma estrutura de dados gerenciada pela Runtime, porém, ainda uma estrutura de Stack.
Coroutines Stackless armazenam o seu estado na Heap, incluindo a informação de qual “segmento de código” ela deve prosseguir, e variáveis em escopo.
O grande diferencial é: suspender e resumir uma Coroutine Stackful se resume, em maior parte, em “copiar” a Stack. Inclusive, resumir esse tipo de Coroutine é muito "barato": apenas restaura a Stack e continua da onde parou. O que é muito similar ao que acontece com uma Thread ao ser pausada e resumida pelo Scheduler.
Já por outro lado, a Coroutine Stackless, ao ser suspensa, precisa armazenar todas variáveis em escopo que serão utilizadas em qualquer ponto futuro, e uma variável de estado que diz que a Coroutine precisa avançar para um certo ponto ao ser resumida. Esses pontos em questão são definido pelo compilador, que divide o código em seções, cada seção contendo um ponto de suspensão (suspension point).
Ao ser resumida, a Coroutine irá utilizar uma switch-table para fazer branching para a parte onde a execução deve continuar, que será a seção adjacente a que suspendeu. Esse é o modelo de uma máquina de estado.
Simétricas vs Assimétricas
Além de Stackful e Stackless, também temos as Co-rotinas Simétricas e Assimétricas.
A diferença aqui agora é sobre controle de fluxo de execução.
As Co-rotinas assimétricas tem uma função para suspender (yield) e uma função para resumir as Coroutines (resume). Nesse modelo, qualquer função pode resumir uma Co-rotina que tenha sido suspensa. Assim são as Co-rotinas da linguagem Lua.
Já nas Co-rotinas simétricas, existe apenas uma função para passar o controle entre as Co-rotinas. É assim que acontece no Kotlin. Sempre ao chamar uma Co-rotina você está passando o controle a ela, e essa Co-rotina ao retornar está devolvendo o controle. Essa é a única maneira de se passar controle nesse modelo, não existe uma função para resumir uma Coroutine, e que possa ser chamada de qualquer lugar.
Esse conceito pode ser um pouco difícil de entender, então vamos expressar isso em código, imagina que você tem duas Co-rotinas:
co1 = coroutine.create(function()
...
end)
co2 = coroutine.create(function()
...
end)
Já explico o motivo de estar usando Lua :).
No caso de Co-rotinas Assimétricas, a co2
pode passar o controle para a co1
dessa maneira:
co1 = coroutine.create(function()
print("co1: recvd control, passing to co2")
coroutine.resume(co2)
print("co1: control is back from co2, continuing co2...")
coroutine.resume(co2)
print("co1: control is back, finishing...")
end)
co2 = coroutine.create(function()
print("co2: recvd control, yielding")
coroutine.yield()
print("co2: control is back, finishing...")
end)
coroutine.resume(co1)
-- Prints:
-- co1: recvd control, passing to co2
-- co2: recvd control, yielding
-- co1: control is back from co2, continuing co2...
-- co2: control is back, finishing...
-- co1: control is back, finishing...
E a co2
pode ser resumida por qualquer um que tenha acesso a sua instância, ou seja, isso também é possível:
co1 = coroutine.create(function()
print("co1: recvd control, passing to co2")
coroutine.resume(co2)
print("co1: control is back, finishing...")
end)
co2 = coroutine.create(function()
print("co2: recvd control, yielding")
coroutine.yield()
print("co2: control is back, finishing...")
end)
coroutine.resume(co1)
coroutine.resume(co2)
-- Prints:
-- co1: recvd control, passing to co2
-- co2: recvd control, yielding
-- co1: control is back, finishing...
-- co2: control is back, finishing...
Aqui quem inicia a co2
é a co1
, mas quem resume a co2
é o nosso código principal, não quem iniciou a Co-rotina originalmente.
Já no Kotlin, com Co-rotinas Simétricas, isso não é possível, elas só podem passar o controle dessa maneira:
co1 = coroutine.create(function()
print("co1: recvd control, yielding back to parent")
coroutine.yield()
print("co1: control is back, finishing...")
end)
co2 = coroutine.create(function()
print("co2: recvd control, yielding back to parent")
coroutine.yield()
print("co2: control is back, finishing...")
end)
coroutine.resume(co1) -- Prints: “co1: recvd control, yielding back to parent”
coroutine.resume(co2) -- Prints: “co2: recvd control, yielding back to parent”
coroutine.resume(co1) -- Prints: “co1: control is back, finishing...”
coroutine.resume(co2) -- Prints: “co2: control is back, finishing...”
Ou seja, elas só podem fazer yield
para a Co-rotina invocadora, não tem como elas escolherem qual Co-rotina irá receber o controle. Elas até podem invocar outra Co-rotina, mas nesse cenário ela estará iniciando uma nova Co-rotina, e não resumindo e nem colaborando entre diferentes Co-rotinas que não tenham relação direta entre si.
Claro que toda essa manipulação é feita automaticamente pelo compilador, você não precisa fazer isso na mão.
E o motivo para eu usar Lua como exemplo não é só pelo fato de ter uma implementação exemplar de Co-rotinas, mas também pela flexibilidade que elas fornecem.
Pontos importantes
Stackful Simétricas e Assimétricas, Stackless Simétricas e Assimétricas:
Apesar do nosso exemplo ser de Co-rotinas Stackful Assimétricas, como a do Lua, podemos ter Co-rotinas Stackful Simétricas também, como se pode ser observado no segundo exemplo que imita as Co-rotinas do Kotlin.
E tecnicamente falando, também podemos ter Co-rotinas Stackless Simétricas, porém não temos nenhum exemplo de implementação devido as dificuldades de manipular o estado e os pontos de suspensão, e talvez não seja muito viável visto que não temos nenhum problema que já não possamos resolver com as implementações atuais.
Goroutines não são exatamente Co-rotinas
Apesar das similaridades, o Go tem uma implementação bem diferente, podemos sim classificar como Co-rotinas, mas apenas se não formos seguir estritamente as especificações.
Por exemplo, uma Goroutine não consegue passar controle diretamente para outra por si só, isso é gerenciado pelo Scheduler, isso coloca as Goroutines em uma situação não usual: não podem ser Assimétricas nem Simétricas pelo mesmo motivo: não podem passar controle nem devolver controle. Uma Goroutine que lança outra, nunca deixa de ter o controle, enquanto a Goroutine lançada, também não tem como devolver controle, pois ninguém nunca entregou o controle da execução.
No entanto…
O modelo simétrico e assimétrico podem implementar/simular o comportamento um do outro
Uma Co-rotina que seja simétrica consegue “implementar” (o mais correto seria simular) um modelo assimétrico e o mesmo para uma Co-rotina assimétrica, que pode “implementar” o modelo simétrico.
E como isso é possível? Bom, de várias maneiras, uma Co-rotina simétrica pode implementar uma assimétrica utilizando de uma terceira Co-rotina de Scheduling que ficará responsável por executar as Co-rotinas requisitadas.
Já uma Co-rotina assimétrica pode implementar a simétrica utilizando o modelo regular de yielding imediatamente seguido por resume.
Apesar da possibilidade, não é assim que fazemos normalmente, apenas adaptamos o nosso código ao modelo empregado na linguagem. Também existem alguns detalhes adicionais para se levar em consideração ao tentar simular o outro modelo, e isso pode afetar o desempenho, a eficiência e até dificultar a manutenção de depuração. Salvo nos casos onde a linguagem em si já fornece Co-rotinas mais flexíveis.
E além disso, estruturas como Channel
podem ser usadas para passar informações e controle entre as Co-rotinas, ou até implementar/simular um modelo diferente.
Paralelismo
As Coroutines podem ser multiplexadas para diferentes Threads do sistema, permitindo assim programação paralela e concorrente, além de assíncrona.
Isso pois, o “agendador” das Coroutines, pode trocar entre diferentes Coroutines quando uma tarefa bloqueante ocorrer.
Exemplo prático
Um exemplo prático para o uso de Coroutines é na chamada para executar uma consulta no banco de dados.
Tome um exemplo de uma API que busca todos posts de um blog no banco de dados, em um código sequencial e síncrono, a Thread que está atrelada a esta requisição ficará bloqueada até que os dados voltem do banco, impedindo que qualquer outra tarefa se aproveite dela para continuar seu trabalho. Threads são recursos que tem seus custos, e deixar esse recurso ocioso não é algo positivo.
Uma das formas de resolver esse problema é com a programação reativa, que funciona sobre um conceito de loop de eventos, permitindo que outras tarefas sejam processadas enquanto o banco de dados não retorna os dados. No entanto, isso envolve pensar de uma forma muito diferente, requer muitas mudanças centrais em códigos já existentes e pode acabar em códigos menos legível e com muitas “callbacks”.
Esse problema também pode ser resolvido com as Coroutines, quando uma operação bloqueante ocorrer, o agendador simplesmente suspende a execução da função e começa a trabalhar em outra Coroutine, quando a tarefa terminar, a Coroutine pode ser resumida para continuar seu trabalho. Umas das vantagens são: menor atrito na migração para Coroutines, código mais legível e pensamento próximo ao imperativo/procedural (ou seja, menos “callbacks”).
Utilizando as Coroutines
Suspend
O mais básico e central das Coroutines é a palavra-chave suspend
, que marca nossa função como uma função que pode ser suspendida:
Ao marcar nossa função, o compilador Kotlin saberá que precisa fazer uma engenharia diferente na compilação, implementando um conceito de Máquina de Estados.
Com essa marcação, podemos chamar outras funções que suspendem, algo que não podemos fazer nas funções regulares, pois a runtime não saberia como suspender e resumir essas funções.
Quando é absolutamente necessário chamar uma coroutine a partir de funções regulares, temos que utilizar o runBlocking
:
Porém, a função runBlocking
irá bloquear a Thread atual até que a Coroutine complete.
Sempre que possível, utilize a palavra-chave suspend
nas funções quando for chamar as co-rotinas:
Chamar tarefas bloqueantes
Caso você tente chamar tarefas bloqueantes, como ler um arquivo, de dentro de uma Coroutine, você será alertado:
O motivo disso acontecer é que, ao bloquear uma Coroutine, você perderá todas vantagens de as utilizar, já que a Thread atual será bloqueada e o agendador não poderá executar outros trabalhos enquanto espera a tarefa terminar.
Para fazer esse tipo de chamada, você precisará utilizar o contexto de I/O:
Note que você ainda irá receber um aviso sobre a chamada bloquear, porém, isso é um bug nas inspeções feitas pelo plugin do Kotlin, você pode utilizar a função async
caso queira se livrar do aviso:
Porém isso resulta em uma inspeção que irá recomendar você trocar async
por withContext
:
Você pode simplesmente ignorar.
Como funciona?
As chamadas que bloqueiam a Thread serão enviadas a um pool de threads dedicadas a operações de I/O. Após isso, a execução da coroutine irá suspender, e quando a operação terminar, a execução será resumida.
Dessa forma, operações de leitura e escrita em arquivos, sockets ou qualquer operação que bloqueie a Thread, não afetarão a execução das rotinas. Isso inclui requisições HTTP e consultas em bancos de dados.
Executar operações de forma assíncrona
Com as coroutines, podemos executar tarefas de forma assíncrona com muita facilidade:
Isso é possível até mesmo em contextos single-thread
Acima utilizamos um contexto totalmente single-thread, com exceção das operações de I/O, e mesmo assim, a saída do nosso programa reporta que a segunda tarefa foi iniciada, mesmo sem a primeira ter acabado:
O tempo que nosso programa levou foi de menos que 2s, mesmo com duas tarefas que pausam por 1s.
Isso é algo muito interessante, pois se bem utilizado, ao invés do tempo de resposta estar atrelado a soma do tempo levado por cada tarefa, estará atrelado ao tempo que a tarefa mais lenta levar.
Claro, considerando que todo código possa ser assíncrono e as tarefas não sejam dependentes umas das outras. Caso sejam, pode ser necessário um retrabalho no código com um pensamento diferente.
Dica
Uma dica que eu dou ao migrar para código concorrente e assincrono, independente de coroutines, programação reativa, thread pools e CompletableFuture, ou qualquer outra abordagem, é: você poderá ter que reduzir as grandes operações em operações menores que rodam em paralelo.
Um bom exemplo é com consultas em bancos NoSQL, caso você precise buscar posts de um blog, e depois as categorias de cada post, você pode quebrar em 3 tarefas menores:
- Buscar todas categorias registradas
- Buscar todos os posts
- Associar as categorias aos posts
Ou você pode:
- Buscar todos os posts
- Buscar as categorias com base nos posts
- Associar as categorias aos posts
Isso dependerá de qual abordagem é mais eficiente, e se o blog tem tantas categorias que não vale a pena trazer tudo em memória.
Pois na primeira abordagem, você pode rodar as duas operações de forma assíncrona, uma não dependerá da outra.
Enquanto na segunda abordagem, a consulta feita pelo segundo passo depende de dados que ainda não vieram no primeiro passo.
Em outros cenários, você pode até preferir dar mais trabalho para o banco de dados, fazendo mais requisições em paralelo e consultas mais complexas, do que criar uma dependência entre as operações. Como, por exemplo, ao invés de rodar a associação na sua aplicação, pedir ao banco de dados retornar a associação entre os dados, assim você teria uma tarefa a menos para a aplicação, e uma a mais para o banco.
De qualquer forma, você precisará aprender qual é a melhor abordagem e com melhor performance, validando se a dependência entre as operações custa mais do que delegar ao banco.
Digo isso pois recentemente (não mais recentemente, já que o artigo está pronto a uns 2 meses) tive que atualizar um código legado que fazia uma, duas, e as vezes até três consultas no MongoDB, que retornava vários elementos, e para cada elemento de cada consulta, fazia uma outra consulta no banco, na tentativa de atingir um resultado equivalente a um join entre tabelas em bancos relacionais. Durante a migração, foi possível reduzir em no máximo 4 consultas ao banco, sem a necessidade de fazer consultas adicionais para cada retorno.
Isso só foi possível pois a quantidade de dados não era tão grande, e esses dados crescem menos que uma dezena por mês, as vezes não chegam nem a mudar durante meses. Isso envolveu fazer consultas mais complexas, antes fazíamos apenas consultas de busca simples, passamos a fazer consultas com agregação, tanto para fazer associação como computar o número de ocorrências.
Nesse caso acabamos dando menos trabalho ao banco, pois nos livramos de consultas adicionais desnecessárias, com o contraponto de rodar consultas menos simples, o que se provou valer mais a pena. O tempo de resposta de algumas operações caiu de 7 segundos para no máximo 1 segundo, com a média em poucos milissegundos. E isso não afetou de forma perceptível o uso de CPU e memória do banco de dados. Ao mesmo tempo beneficiou o microsserviço, que tem quantidade de memória e CPU bem limitadas.
Contexto
Todas Coroutines executam em um certo “contexto”, esse contexto armazena dois valores importantes, o Job representando a execução, e o agendador utilizado para despachar a execução.
Esse contexto é extremamente importante para termos concorrência estruturada, que permite que, caso um Job falhe, os demais Jobs iniciados por ele sejam cancelados, realizando a propagação do erro.
Além disso, os contextos podem ser utilizados para armazenar informações a serem compartilhadas com todas outras Coroutines, que tenham sido iniciadas nesse mesmo contexto, ou seja, elas herdam os contextos, criando uma hierarquia entre eles.
Agendadores
Existem 4 principais agendadores de Coroutines: Default, IO, Main e Unconfined.
Default
É o agendador padrão utilizado quando nenhum é especificado, principalmente nas funções launch e async. Esse agendador utiliza um pool de Threads compartilhado, com número de Threads equivalente ao número de núcleos do processador, ou no mínimo duas Threads (quando o sistema reporta que tem apenas um núcleo).
IO
Esse, por sua vez, é utilizado para delegar tarefas de I/O, como tarefas de leitura e escrita de arquivos, comunicações de rede, comunicação entre processos, e assim por diante.
Esse agendador é muito útil para tarefas que bloqueiam a Thread atual, já que isso não pode ocorrer em uma Coroutine (por motivos já citados), principalmente em lógicas bloqueantes de bibliotecas externas ou fora do nosso controle.
Tarefas despachadas para o agendador IO escalam para até 64 Threads, ou o número de núcleos disponíveis (qual for maior), e esse valor pode ser configurado por meio de propriedades da aplicação. A maior parte das Threads, se não todas, estarão, na maioria das vezes, em um estado dormente, a espera da conclusão de alguma tarefa ou a liberação da execução, isso garante que um grande número de tarefas de IO recebam a devida atenção.
Main
Agendador para enviar tarefas para a mesma Thread responsável pela UI, a ativação desse agendador depende da presença de certas dependências em sub-módulos do Kotlinx Coroutines, como a dependência do Android, JavaFx ou Swing.
Normalmente, esse agendador é single-threaded (pois as UIs costumam rodar em apenas uma Thread).
Unconfined
Esse é bem especial, ele não utiliza nenhuma Thread em especial, ele executa as Coroutines no mesmo stack-frame, e Coroutines executadas dentro dessa mesma usando esse agendador, formam um loop de eventos, para evitar StackOverflows.
Além disso, a ordem de execução não pode ser garantida e depende da implementação utilizada, o que pode diferir bastante em diferentes ambientes, como com Kotlin/Native, JS, Kotlin/JVM e no Android.
Valores definidos pelo usuário
É possível também armazenar valores definidos pelo próprio usuário por meio do Contexto:
O código acima irá resultar em:
Ou seja, bibliotecas podem fornecer valores utilizando o contexto, o que pode se mostrar útil em vários cenários, como, por exemplo, obter o usuário autenticado que fez uma requisição, mesmo quando o código que busca esta informação está muito distante do código que as fornece.
Continuação
As Coroutines do Kotlin são implementadas utilizando um modelo chamado CPS (Continuation Passing Style), onde a função passa para seguinte um objeto de continuação, que será responsável por receber esse valor e executar o código responsável por tratar.
Por exemplo, abaixo temos um código utilizando o CPS:
Nós poderíamos obter os nomes dos usuários assim:
Note que todas funções são desprovidas de um retorno, no CPS, a função retorna como a chamada de um callback, o retorno na verdade se torna o argumento da chamada de uma outra função.
Nota importante: esse não é o mesmo código aplicado nas Coroutines do Kotlin, mas tem uma grande similaridade, como pode ser notado próprio código fonte.
Uma grande vantagem do CPS é que, código escritos dessa maneira são facilmente paralelizáveis, e técnicas como Trampolining e até mesmo Event-Loop podem facilmente serem aplicadas, o que o Kotlin faz por trás é justamente tornar o código sequencial, imperativo, em um código com CPS e Suspension.
Suspension
Aqui que entra um outro conceito extremamente interessante, a suspensão, todas funções anotadas com suspend
retornam, na verdade, um valor do tipo Any
, isso pois o retorno de uma função suspend
pode ser tanto o resultado esperado, caso a função chegue no final da sua execução, ou pode retornar COROUTINE_SUSPENDED, que indica que a execução foi suspendida.
Toda função que suspende recebe uma implementação própria de Continuation, onde é armazenado, além das variáveis em uso pelo trecho de código, um valor chamado label
, que é responsável por dizer em que ponto a execução deve ser resumida.
Assim, na próxima vez que a função for chamada com o objeto de Continuation, a execução irá resumir a partir do ponto representado por esse label
, que pode ser incrementado, caso a execução não termine ali, ou apenas mantido como está, caso ali seja o final da execução, e se for, ao invés de retornar COROUTINE_SUSPENDED, o valor regular resultante da execução será devolvido para quem chamou essa função.
Como o propósito desse artigo não é explicar a fundo as Coroutines, deixo essa recomendação de leitura para os curiosos.
Suspender execuções com suspendCoroutine
Agora que entendemos um pouco dos dois mecanismos (o CPS e a suspensão), vamos olhar o seguinte código:
O código acima irá exibir o usuário Foo
após 1000 milissegundos, isso pois a função suspendCancellableCoroutine
nos fornece um controle melhor das Coroutines, nesse caso, temos o controle de quando ela deve continuar.
A existência desse mecanismo é muito importante para podermos implementar interações de baixo nível, e construir códigos de alto nível para interagir com diversos recursos, como bancos de dados, requisições, e assim por diante.
É muito útil para transformarmos um código baseado em callbacks em Coroutines com suspensão e continuação.
Porém, o código acima ainda tem um problema, não implementamos uma lógica para cancelar o mesmo, quando isso ocorrer. Como as Coroutines implementam concorrência estruturada, e se uma tarefa falhar, as demais devem ser canceladas.
O código abaixo mostra uma das maneiras de implementar o cancelamento de uma Coroutine:
Caso o tratamento do cancelamento não ocorra, a Coroutine será cancelada, tendo sua execução interrompida, no entanto, os recursos iniciados por ela não serão interrompidos também.
Por isso é extremamente importante implementar o cancelamento, quando possível.
Outro ponto importante é que, se o resume
nunca for chamado, sua Coroutine nunca será resumida. Apesar de parecer ruim, poder ser útil para implementar short-circuit em algumas operações, como o arrow-kt faz com o either
.
Flow<T>
Em alguns momentos precisamos fornecer vários elementos para as coroutines. Em uma função regular utilizaríamos List, Sequence ou Stream, porém todas essas estruturas não suportam suspensão.
As implementações de List (ou de qualquer uma Collection) serve para fornecer informações já computadas, a função que chamou precisa esperar até que todos valores estejam disponíveis. Já no caso de Sequence e Stream, são feitas para serem consumida apenas uma vez, e você não consegue chamar funções que suspendem nas suas operações, como dentro de um map e filter, sem contar que são estruturas para trabalhar com operações bloqueantes, ou seja, as operações bloqueiam a Thread atual enquanto esperam pelo próximo elemento.
Já no caso do Flow, ele pode ser consumido inúmeras vezes, cada nova subscrição inicia uma nova produção de elementos, elementos esses que podem ser produzidos de forma assíncrona e concorrente, já que é uma estrutura que utiliza da suspensão.
Outro ponto importante é que, construir um Flow não é uma operação bloqueante, e por isso funções que o constroem não necessariamente são marcadas com suspend
, exceto nos casos que elas chamam funções suspend
.
Isso pode parecer estranho no começo, mas quando você entende a fundo as Coroutines, você vê o motivo de ser assim, e realmente faz sentido. Quando você cria um Flow, você está servindo um estrutura em que suas operações são executadas tardiamente, apenas quando necessárias. Isso faz com que você não precise suspender a execução de uma função que cria um Flow, pois as operações bloqueantes não serão chamadas ao construir essa estrutura.
Apenas quando as funções que precisão que os valores sejam produzidos são chamadas (como a função collect
), que o código que produz os elementos do Flow será chamado, e esse código pode conter operações bloqueantes, ou chamar outras Coroutines, ai sim essa chamada poderá ser suspendida.
Na maioria das vezes, a função que cria um Flow não contém nada além que uma chamada de uma função, que fornece como parâmetro, uma outra função, essa última função que tem o código responsável por produzir os elementos do Flow. É essa função fornecida ao Flow que tem código que pode ser suspendido.
Por exemplo:
Acima temos uma função number
responsável por criar um Flow
que pode emitir números infinitamente, e na nossa função main
apenas obtemos 5 desses números, a função collect
é que suspende, ao chamar ela, o flow
será consumido e fechado, ou seja, não precisamos usar todos os números nem mesmo pagar pelo custo de armazenar todos na memória. Apenas pagamos pelo que usamos.
Porém, o Flow
é uma estrutura extremamente útil para vários cenários, por exemplo, para ler dados do banco:
Ou para ler linhas de um arquivo sob-demanda, para Streaming de dados, e assim por diante, já que cada subscrição a um flow
inicializa uma nova chamada para produzir os elementos.
Importante ressaltar que normalmente o Flow
representa uma cold stream, isso quer dizer que os elementos são produzidos dentro do próprio Flow e uma nova instancia do Flow é criada para cada subscrição.
SharedFlow<T>
Uma variante de que representa uma hot stream, (ou um hot flow), que faz broadcast para todos assinantes desse Flow. Por ser do tipo hot, os valores são produzidos fora dela, e todos assinantes compartilham a mesma instancia.
Replay
Na ausencia de qualquer assinante, os valores são simplesmente jogados fora, porém, é possível configurar um valor de “replay”, que indica a quantidade de valores que serão armazenados no buffer para serem repetidos a todos novos assinantes, assim, caso não exista nenhum, esses valores não são perdidos e serão repetidos a cada nova assinatura.
Por exemplo:
Esse código exibirá:
No entanto, se configurarmos um valor para replay:
O seguinte será exibido:
Isso acontecerá para todas novas subscrições, porém, note que no final emitimos um outro valor, então novas subscrições pegará esse novo valor, dado que nosso valor de replay define que apenas uma mensagem fique em cache para replay.
Dito isso, esse código:
Irá produzir a seguinte saida:
Ou
Já que o código é assíncrono, a ordem pode mudar a cada execução, porém a consistência se mantém, sempre os últimos N valores serão enviados para novos consumidores (sendo N o valor configurado para replay), ou seja, a cada nova assinatura os valores serão reemitidos para esses consumidores, mesmo que já tenham sido consumidos por outros:
extraBufferCapacity
Quando um novo valor é emitido em um SharedFlow<T>
, ele é enviado para todos assinantes, e a função de emissão suspende até que todos assinantes tenham recebido os valores, ou seja, o seguinte código:
Resulta em:
Ou seja, mesmo tendo dois consumidores que executam de forma concorrente e assincrona, a segunda mensagem só é emitida após a primeira ter sido consumida por todos assinantes (como a mensagem All values emitted
sinaliza), e como nossos assinantes tem um delay de 1s dentro do collect
, cada emissão (ou chamada de emit
) irá suspender por 1s.
Isso quer dizer que, um consumidor lento irá causar lentidão na emissão ao mesmo tempo que irá também reduzir a velocidade que os outros consumidores ingerem esses valores, já que a taxa de emissão é reduzida.
O extraBufferCapacity
entra para ajudar com que a emissão continue, até certo ponto, mesmo que existam consumidores que estejam bloqueando a execução. Esse valor sinaliza quantas mensagens adicionais podem ser enviadas até que a chamada ao emit
suspenda:
Esse código resulta em:
Porém, apesar de ser similar ao replay
, o extraBufferCapacity
difere por não reemitir os valores, isso quer dizer que novas subscrições não irão receber valores emitidos anteriormente a subscrição.
No entanto, tanto o replay
quanto o extraBufferCapacity
podem usar o mesmo buffer (e normalmente o fazem), o extraBufferCapacity
define um valor adicional ao que já foi definido no replay
, ou seja, ambos são somados e usados para definir o tamanho do buffer. No entanto, só uma parcela dos valores no buffer serão reemitidos, que corresponde aos N ultimos valores do buffer, onde N é o valor definido como replay
.
Ou seja, caso ao invés de definir o extraBufferCapacity = 1
, fosse definido o replay = 1
, o código teria o mesmo comportamento de não suspender na chamada do emit
, mas apenas até alcançar o limite do buffer, que será de 1 (o mesmo valor de replay
), assim como ocorre com o extraBufferCapacity
. No entanto, o replay
incorre em reemissão dos elementos, o que pode ser indesejado.
Como já dito, o tamanho do buffer é equivalente a soma do replay
e do extraBufferCapacity
, ou seja, o código abaixo poderá emitir até 2 elementos (além do que já foi emitido) antes de suspender.
Saida:
onBufferOverflow
Essa propriedade é a que define o que ocorre quando uma nova emissão ocorrer porém não houver mais espaço no buffer, esse valor só pode ser definido se o valor de replay
ou extraBufferCapacity
for maior que zero.
Esse valor permite que, quando novas emissões ocorrerem e não houver mais espaço no buffer, ao invés de suspender a função de emissão (que é a opção padrão), o elemento mais novo ou mais antigo seja descartado.
Por exemplo:
Irá exibir:
Note que os elementos mais novos (que foram emitidos por último) foram descartados.
Já nesse caso:
O resultado é esse:
Nesse os valores descartados foram os mais antigos (os que foram emitidos primeiro).
Em ambos os códigos, a execução do programa irá ficar suspensa também, pois os dois primeiro consumidores estão consumindo valores até um total de 4, mas receberam apenas 2.
O comportamento é exatamente igual para o replay
, com a diferença que, além do buffer, também há a reemissão para novos assinantes.
Isso é extremamente importante para emissões que ocorrem a todo momento, sem parar, e os consumidores não conseguem acompanhar a produção. Apesar de parecer uma ótima estrutura para Streaming de midia, o SharedFlow<T>
, só continua a emissão quanto todos consumidores recebem os valores, então se apenas um consumidor estiver lento, todos outros são afetados, ou seja, não irão receber valores novos. No caso do SharedFlow<T>
, todos consumidores recebem os mesmos valores.
Para um caso, como o Streaming de midia, quando os consumidores podem receber os valores em momentos diferentes, e a emissão também não pode parar, o interessante é utilizar a função buffer
do Flow<T>
, ela permite que o buffering seja feito pelo consumidor e não pelo produtor, essa função dispõe das mesmas propriedades de capacidade e de selecionar a estratégia onBufferOverflow
.
StateFlow<T>
É um tipo de SharedFlow<T>
, com a diferença de ser orientado a fazer broadcast de mudança de estado.
Ao se inscrever em um StateFlow<T>
, o consumidor receberá o último valor armazenado como uma emissão, e novos valores serão emitidos para esses assinantes.
É importante ressaltar que a função StateFlow<T>.emit(T)
nunca suspende, já que um StateFlow<T>
é um SharedFlow<T>
com replay = 1
e onBufferOverflow = DROP_OLDEST
.
Essa estrutura é interessante para ser utilizada por código concorrente onde o estado seria armazenado utilizando uma referência atomica (AtomicReference<T>
), mas precisa também “assistir” as mudanças. Essa implementação pode ser usada para “ouvir” mudanças em uma TextBox, por exemplo, e espelhar essas mudanças em um Label, enquanto outro consumidor as envia para um servidor por websocket.
As aplicações são diversas, não irei mostrar exemplos pois é basicamente a mesma ideia que um SharedFlow<T>
com replay e descarte no caso de overflow, só que orientado a estados.
Cancellable Flow
A maioria dos Flow
construidos são canceláveis, ou seja, param de emitir elementos quando uma requisição de cancelamento é feita, seja manualmente ou por alguma tarefa ter sido cancelada no mesmo escopo (que pode ter sido cancelada por vários motivos, como uma exceção).
Por exemplo, o código abaixo emite números até que acaba sendo cancelado pois uma tarefa assincrona falhou com uma exceção:
Isso vai resultar em algo como:
E a emissão será cancelada.
Porém, dependendo das construções utilizadas para criar um Flow<T>
, ele pode não ser cancelável, para torna-lo precisamos chamar a função cancellable
, que irá verificar se a tarefa não foi cancelada, a cada emissão.
Flow<T>
produzidos usando flow
e as implementações de SharedFlow<T>
(bem como StateFlow<T>
) já são canceláveis por padrão.
Funções de alta ordem
O Flow<T>
dispõe de diversas funções de alta ordem, como map
, flatMap
, filter
, reduce
.
Porém, apenas algumas suspendem, como é o caso do reduce
e do collect
, essas funções causam a iniciação da emissão dos elementos, enquanto que as funções que não suspendem apenas retornam um novo Flow<T>
que utiliza a emissão do anterior junto com alguma operação, como map ou filter, para emitir seguindo as regras descritas nas funções chamadas.
Caso você queira entender melhor das funções de alta ordem, eu escrevi um artigo que fala sobre elas:
As mesmas ideias podem ser aplicadas ao Flow
.
CallbackFlow<T>
Uma variação do Flow<T>
para casos onde os valores são produzidos por uma callback, que normalmente está rodando de forma concorrente. Normalmente útil quando o produtor dos valores é um código externo governado por callbacks.
Veja abaixo um exemplo extremamente simples:
O callbackFlow
fornece um canal para onde as emissões devem ser feitas, ao mesmo tempo que fornece uma API para que seja possível fazer o cancelamento dessa produção quando o Flow<T>
for fechado, seja por cancelamento ou por fim do consumo dos valores.
Channel
Uma abstração de nível mais baixo que o Flow
, os canais servem de meio de comunicação entre dois códigos concorrentes, por exemplo:
No caso do Channel
, temos uma ponta produtora e uma ponta consumidora, e ambas devem levar em consideração que o canal pode ser fechado em algum momento, e com isso, fechar qualquer recurso aberto também.
O modo padrão que um canal trabalha é o RENDEZVOUS, onde quem envia (chamando alguma variação de send
) suspende até que alguém receba esse valor (chamando alguma variação de receive
), o mesmo se aplica a quem recebe, que irá suspender na chamada de receive
(ou alguma variação) até que alguém produza um valor (usando o send
).
Existem também outros modos, como buffering e estratégias para BufferOverflow, como suspender quando houver overflow ou dropar o elemento mais recente ou mais antigo.
No entanto, o uso de Channel
muitas vezes pode ser evitado ao utilizar o Flow
, que é muito mais seguro e simples de usar. Em códigos maiores pode ser mais dificil debugar códigos que utilizem o Channel
do que os que utilizam Flow
, pois o Channel
depende muito mais de cooperação entre diferentes pedaços de código.
Porém existem sim momentos onde o Channel
será a melhor solução, mas isso cabe ao programador identificar esses momentos, com base na sua experiencia.
Select Expression
Esse é um recurso que permite que, dado diversas tarefas, obtemos o valor da que terminar primeiro, por exemplo:
Irá retornar Foo after 700
.
É interessante notar que as demais tarefas não irão ser canceladas quando a primeira retornar, porém se alguma tarefa falhar, o select
irá reproduzir esse erro, mesmo que a outra tarefa venha a completar com sucesso futuramente.
Para ver quais as funções são suportadas no select, veja a documentação:
Mutex e Semaphore
Agora vamos falar dos Mutexes e do Semaphore.
Mutex
Mutexes são como os Locks, porém não são re-entrantes, ou seja, não é possível chamar lock
mais de uma vez na mesma Thread/Coroutine. Apenas uma Coroutine mantém o lock
e todas outras suspendem enquanto o lock não é liberado.
O Mutex é necessário pois não há garantia de que uma Coroutine será resumida na mesma Thread em que ela foi iniciada, o que torna o Lock
totalmente inutil para esses casos.
No caso acima, apenas uma Coroutine poderá ler ou escrever de cada vez, o acesso nunca ocorrerá de forma concorrente.
Mutex é uma forma de garantir acesso e modificação a estruturas sincronas que não suportam concorrencia, por qualquer motivo que seja.
Porém, o Mutex é muito mais lento que um Lock, por diversos motivos, inclusive por ausencia de otimizações JIT, que ocorrem com o Lock mas podem nunca ocorrer com Mutex, que não é uma estrutura nativa do Java. Por isso, e por muitos outros motivos, é interessante manter seu código altamente concorrente e fazer o mesmo com todas estruturas utilizadas por ele.
O uso de estruturas concorrentes, como ConcurrentHashMap
e outras fornecidas pelo pacote java.util.concurrent
do java.base
é extremamente importante para manter uma boa performance de códigos concorrentes.
Semaphore
É como um Mutex, porém com uma quantidade predefinida de espaços, e ao invés de ter um Lock, você tem um Permit, que é basicamente um Lock.
Com o Semaphore, você consegue ter até N Coroutines acessando um mesmo recurso ao mesmo tempo. Isso pode ser utilizado para controlar a quantidade de requisições feitas a um banco de dados ao mesmo tempo, por exemplo.
O código abaixo mostra um exemplo bem simples de uso do Semaphore:
Esse código ira retornar algo próximo de:
Apenas 2 Coroutines serão capazes de retornar o valor dentro de um periodo de 1s, a outra Coroutine terá que esperar até ter o Permit para poder executar.
Podemos também utilizar o tryAcquire
(um amigo do tryLock
do Mutex), que retorna true
caso um Permit possa ser adquirido no momento da chamada, ou seja, ainda existam permissões no Semaphore
, e por meio disso podemos codificar nosso resultado para refletir nisso:
O resultado será algo próximo de:
Porque objetos próprio para as Coroutines?
Uma das dúvidas que pode surgir, é por qual motivo não utilizar o Lock
e o Sempahore
já disponível na linguagem Java.
As construções fornecidas pelo Java são voltadas para as Threads, enquanto as do Kotlin, para as Coroutines, as Coroutines podem ser iniciadas na Thread A, resumidas na Thread B e depois suspensas, e finalizar sua execução na Thread C, ou qualquer outra ordem, elas não são ligadas a uma Thread, então usar as construções do Java podem resultar em deadlocks.
Outro ponto é que as construções do Java bloqueam a Thread, o que iria fazer com que qualquer outra Coroutine que precisasse ser executada, não possa ser, já que a Thread não foi liberada. Por isso as construções do Kotlin usam suspensão ao invés de bloquear, pois elas podem suspender até o Mutex ou Semaphore ser liberado ou ter espaço disponível, sem ter que travar nenhuma Thread.
E por isso, costumam ser mais lentas, mas nada absurdo, porém impactante para algoritmos de alta performance. A ausência de otimização pela Runtime do Java é devido as Coroutines serem um recurso extremamente novo e desconhecido pela Runtime, inclusive, a maior parte das Coroutines são implementadas por uma Runtime própria do Kotlin, que as gerencia. Isso tira um pouco a possibilidade da JVM aplicar as mesmas otimizações já aplicada a Threads, mas isso pode mudar com o projeto Loom.
Actor
Por fim, vamos falar dos atores, que infelizmente estão marcados como @ObsoleteCoroutinesApi
devido a issue #87
Porém ainda não há alternativas implementadas, então ainda podem ser considerados para serem usados, em casos específicos.
Atores são uma forma de troca de mensagens entre Coroutines, e podem ser utilizados para implementar o compartilhamento de estados de forma concorrente, quando esses estados envolvem a manipulação de estruturas complexas. Atores não são a melhor escolha para operações simples, como incrementar um Int, visto que eles tem um certo custo que precisa se pagar por meio dos ganhos que podem oferecer.
No caso de operações simples, como incrementar um número, construções como AtomicInteger
, AtomicLong
, AtomicDouble
fornecidas pelo pacote java.util.concurrent.atomic
deve ser suficiente.
Você pode ler um pouco sobre essas construções nesse meu artigo:
Um exemplo mostrado na própria documentação do Kotlin é para a incrementação de um número, como você pode ver no documento abaixo:
Porém, outro cenário em que achei extremamente útil foi para aplicar operações mais complexas em coleções, e garantir que ela não mude durante essas operações:
Com os Actors é possível abstrair operações em mensagens, que ao completarem passam essa informação para uma callback, ou um CompletableDeferred.
No caso acima, eu poderia implementar outras formas de busca que precisam bloquear o recurso durante o tempo em que a busca ocorre, sem a necessidade de utilizar Mutex, por exemplo.
Uma das vantagens dos atores é que suas construções são feitas em volta do mecanismo das Coroutines, então é possível chamar outras funções suspend
dentro da lógica que trata as mensagens, suspendendo o ator, e o uso de CompletableDeferred
permite que o ponto que precisa desse valor também seja suspendido. Outra vantagem é que os atores herdam o escopo em que são criados, então, caso o escopo seja finalizado/cancelado, o ator também será.
Os atores encapsulam um Canal, esse canal também tem um capacity
que é definido na criação do ator e é utilizado como valor do buffer de mensagens desse canal.
No entanto, atores tem um custo de performance considerável, então o seu uso deve pagar esse custo, ou esse custo deve ser irrelevante para os beneficios que o seu uso traz.
Recomendo os seguintes documentos para leitura sobre atores:
Conclusão
Nesse artigo entendemos a fundo conceitos como concorrencia, paralelismo, assincronismo, código não-bloqueante, o papel das Coroutines e seu poder.
A partir dessa base, podemos criar código concorrente e assincrono de forma segura e rápida, com um trabalho menor do que a utilização de programação Reativa, e com um código muito mais simples e limpo.