Programação avançada do Spark

Spark contém dois tipos diferentes de variáveis ​​compartilhadas - uma é broadcast variables e o segundo é accumulators.

  • Broadcast variables - usado para distribuir grandes valores com eficiência.

  • Accumulators - usado para agregar as informações de uma coleção particular.

Variáveis ​​de transmissão

Variáveis ​​de broadcast permitem que o programador mantenha uma variável somente leitura em cache em cada máquina, em vez de enviar uma cópia dela com as tarefas. Eles podem ser usados, por exemplo, para dar a cada nó, uma cópia de um grande conjunto de dados de entrada, de maneira eficiente. O Spark também tenta distribuir variáveis ​​de transmissão usando algoritmos de transmissão eficientes para reduzir o custo de comunicação.

As ações do Spark são executadas por meio de um conjunto de estágios, separados por operações "shuffle" distribuídas. O Spark transmite automaticamente os dados comuns necessários às tarefas em cada estágio.

Os dados transmitidos dessa maneira são armazenados em cache na forma serializada e são desserializados antes de executar cada tarefa. Isso significa que criar variáveis ​​de transmissão explicitamente só é útil quando as tarefas em vários estágios precisam dos mesmos dados ou quando é importante armazenar os dados em cache na forma desserializada.

Variáveis ​​de transmissão são criadas a partir de uma variável v chamando SparkContext.broadcast(v). A variável de transmissão é um wrapper ao redorv, e seu valor pode ser acessado chamando o valuemétodo. O código fornecido abaixo mostra isso -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output -

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Depois que a variável de transmissão é criada, ela deve ser usada em vez do valor v em qualquer função executada no cluster, de modo que vnão é enviado para os nós mais de uma vez. Além disso, o objetov não deve ser modificado após sua transmissão, para garantir que todos os nós obtenham o mesmo valor da variável de transmissão.

Acumuladores

Acumuladores são variáveis ​​que só são “adicionadas” por meio de uma operação associativa e, portanto, podem ser eficientemente suportados em paralelo. Eles podem ser usados ​​para implementar contadores (como no MapReduce) ou somas. O Spark oferece suporte nativo para acumuladores de tipos numéricos, e os programadores podem adicionar suporte para novos tipos. Se os acumuladores forem criados com um nome, eles serão exibidos emSpark’s UI. Isso pode ser útil para entender o progresso dos estágios em execução (NOTA - ainda não é compatível com Python).

Um acumulador é criado a partir de um valor inicial v chamando SparkContext.accumulator(v). As tarefas em execução no cluster podem ser adicionadas a ele usando oaddou o operador + = (em Scala e Python). No entanto, eles não podem ler seu valor. Apenas o programa driver pode ler o valor do acumulador, usando seuvalue método.

O código fornecido a seguir mostra um acumulador sendo usado para adicionar os elementos de um array -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

Se você quiser ver a saída do código acima, use o seguinte comando -

scala> accum.value

Resultado

res2: Int = 10

Operações numéricas de RDD

O Spark permite que você faça diferentes operações em dados numéricos, usando um dos métodos de API predefinidos. As operações numéricas do Spark são implementadas com um algoritmo de streaming que permite construir o modelo, um elemento por vez.

Essas operações são calculadas e retornadas como um StatusCounter Objeto chamando status() método.

A seguir está uma lista de métodos numéricos disponíveis em StatusCounter.

S.Não Métodos e Significado
1

count()

Número de elementos do RDD.

2

Mean()

Média dos elementos do RDD.

3

Sum()

Valor total dos elementos do RDD.

4

Max()

Valor máximo entre todos os elementos do RDD.

5

Min()

Valor mínimo entre todos os elementos do RDD.

6

Variance()

Variância dos elementos.

7

Stdev()

Desvio padrão.

Se você quiser usar apenas um desses métodos, pode chamar o método correspondente diretamente no RDD.