RxPY - Criando Observáveis

crio

Este método é usado para criar um observável. Terá o método do observador, ou seja

  • on_next() - Esta função é chamada, quando o Observable emite um item.

  • on_completed() - Esta função é chamada, quando o Observable é concluído.

  • on_error() - Esta função é chamada quando ocorre um erro no Observable.

Aqui está um exemplo prático -

testrx.py

from rx import create
def test_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error occured")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Aqui está o output do observável criado -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

vazio

Este observável não produzirá nada e emitirá diretamente o estado completo.

Sintaxe

empty()

Valor de retorno

Ele retornará um observável sem elementos.

Exemplo

from rx import empty
test = empty()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
Job Done!

Nunca

Este método cria um observável que nunca alcançará o estado completo.

Sintaxe

never()

Valor de retorno

Ele retornará um observável que nunca será concluído.

Exemplo

from rx import never
test = never()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

It does not show any output.

lançar

Este método criará um observável que gerará um erro.

Sintaxe

throw(exception)

Parâmetros

exceção: um objeto que contém detalhes do erro.

Valor de retorno

Um observável é retornado com detalhes do erro.

Exemplo

from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
Error: There is an Error!

de_

Este método converterá a matriz ou objeto fornecido em um observável.

Sintaxe

from_(iterator)

Parâmetros

iterador: este é um objeto ou array.

Valor de retorno

Isso retornará um observável para o iterador fornecido.

Exemplo

from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!

intervalo

Este método fornecerá uma série de valores produzidos após um tempo limite.

Sintaxe

interval(period)

Parâmetros

período: para iniciar a sequência inteira.

Valor de retorno

Ele retorna um observável com todos os valores em ordem sequencial.

Exemplo

import rx
from rx import operators as ops
rx.interval(1).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

Resultado

E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400

somente

Este método converterá determinado valor em um observável.

Sintaxe

just(value)

Parâmetros

valor: a ser convertido em um observável.

Valor de retorno

Ele retornará um observável com os valores fornecidos.

Exemplo

from rx import just
test = just([15, 25,50, 55])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!

alcance

Este método fornecerá um intervalo de inteiros com base na entrada fornecida.

Sintaxe

range(start, stop=None)

Parâmetros

start: o primeiro valor a partir do qual o intervalo começará.

stop: opcional, o último valor para o intervalo parar.

Valor de retorno

Isso retornará um observável com valor inteiro com base na entrada fornecida.

Exemplo

from rx import range
test = range(0,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!

repeat_value

Este método criará um observável que repetirá o valor fornecido de acordo com a contagem fornecida.

Sintaxe

repeat_value(value=None, repeat_count=None)

Parâmetros

valor: opcional. O valor a ser repetido.

repeat_count: opcional. O número de vezes que o valor fornecido deve ser repetido.

Valor de retorno

Ele retornará um observável que repetirá o valor fornecido conforme a contagem fornecida.

Exemplo

from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!

começar

Este método recebe uma função como entrada e retorna um valor observável que retornará o valor da função de entrada.

Sintaxe

start(func)

Parâmetros

func: uma função que será chamada.

Valor de retorno

Ele retorna um observável que terá um valor de retorno da função de entrada.

Exemplo

from rx import start
test = start(lambda : "Hello World")
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Resultado

E:\pyrx>python testrx.py
The value is Hello World
Job Done!

cronômetro

Este método emitirá os valores em sequência após o tempo limite terminar.

Sintaxe

timer(duetime)

Parâmetros

duetime: tempo após o qual deve emitir o primeiro valor.

Valor de retorno

Ele retornará um observável com valores emitidos após o tempo.

Exemplo

import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

Resultado

E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64