RxPY - Trabalhando com Observáveis

Um observável é uma função que cria um observador e o anexa à fonte onde os valores são esperados, por exemplo, cliques, eventos de mouse de um elemento dom, etc.

Os tópicos mencionados abaixo serão estudados detalhadamente neste capítulo.

  • Criar observáveis

  • Assine e execute um observável

Crie observáveis

Para criar um observável, usaremos create() método e passar a função para ele que contém os seguintes itens.

  • on_next() - Esta função é chamada quando o Observable emite um item.

  • on_completed() - Esta função é chamada quando o Observable é concluído.

  • on_error() - Esta função é chamada quando ocorre um erro no Observable.

Para trabalhar com o método create (), primeiro importe o método conforme mostrado abaixo -

from rx import create

Aqui está um exemplo prático, para criar um observável -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Assine e execute um observável

Para assinar um observável, precisamos usar a função subscribe () e passar a função de retorno de chamada on_next, on_error e on_completed.

Aqui está um exemplo prático -

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

O método subscribe () cuida da execução do observável. A função de retorno de chamadaon_next, on_error e on_completeddeve ser passado para o método de inscrição. A chamada para o método de inscrição, por sua vez, executa a função test_observable ().

Não é obrigatório passar todas as três funções de retorno de chamada para o método subscribe (). Você pode passar de acordo com seus requisitos para on_next (), on_error () e on_completed ().

A função lambda é usada para on_next, on_error e on_completed. Ele pegará os argumentos e executará a expressão fornecida.

Aqui está a saída do observável criado -

E:\pyrx>python testrx.py
Got - Hello
Job Done!