PySpark - Transmissão e Acumulador

Para processamento paralelo, o Apache Spark usa variáveis ​​compartilhadas. Uma cópia da variável compartilhada vai em cada nó do cluster quando o driver envia uma tarefa para o executor no cluster, para que ele possa ser usado para executar tarefas.

Existem dois tipos de variáveis ​​compartilhadas com suporte pelo Apache Spark -

  • Broadcast
  • Accumulator

Vamos entendê-los em detalhes.

Broadcast

Variáveis ​​de broadcast são usadas para salvar a cópia dos dados em todos os nós. Esta variável é armazenada em cache em todas as máquinas e não enviada em máquinas com tarefas. O bloco de código a seguir contém os detalhes de uma classe Broadcast para PySpark.

class pyspark.Broadcast (
   sc = None, 
   value = None, 
   pickle_registry = None, 
   path = None
)

O exemplo a seguir mostra como usar uma variável Broadcast. Uma variável de transmissão possui um atributo chamado valor, que armazena os dados e é usado para retornar um valor transmitido.

----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print "Stored data -> %s" % (data) 
elem = words_new.value[2] 
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------

Command - O comando para uma variável de transmissão é o seguinte -

$SPARK_HOME/bin/spark-submit broadcast.py

Output - A saída para o seguinte comando é fornecida abaixo.

Stored data -> [
   'scala',  
   'java', 
   'hadoop', 
   'spark', 
   'akka'
]
Printing a particular element in RDD -> hadoop

Acumulador

Variáveis ​​acumuladoras são utilizadas para agregar as informações por meio de operações associativas e comutativas. Por exemplo, você pode usar um acumulador para uma operação de soma ou contadores (em MapReduce). O bloco de código a seguir contém os detalhes de uma classe de acumulador para PySpark.

class pyspark.Accumulator(aid, value, accum_param)

O exemplo a seguir mostra como usar uma variável de acumulador. Uma variável de acumulador tem um atributo chamado valor que é semelhante ao que uma variável de transmissão tem. Ele armazena os dados e é usado para retornar o valor do acumulador, mas pode ser usado apenas em um programa de driver.

Neste exemplo, uma variável de acumulador é usada por vários trabalhadores e retorna um valor acumulado.

----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------

Command - O comando para uma variável de acumulador é o seguinte -

$SPARK_HOME/bin/spark-submit accumulator.py

Output - A saída para o comando acima é fornecida abaixo.

Accumulated value is -> 150