Spark SQL - DataFrames

Um DataFrame é uma coleção distribuída de dados, que é organizada em colunas nomeadas. Conceitualmente, é equivalente a tabelas relacionais com boas técnicas de otimização.

Um DataFrame pode ser construído a partir de uma matriz de fontes diferentes, como tabelas Hive, arquivos de dados estruturados, bancos de dados externos ou RDDs existentes. Esta API foi projetada para aplicativos modernos de Big Data e ciência de dados, inspirando-se emDataFrame in R Programming e Pandas in Python.

Recursos do DataFrame

Aqui está um conjunto de alguns recursos característicos do DataFrame -

  • Capacidade de processar os dados no tamanho de Kilobytes a Petabytes em um cluster de nó único para cluster grande.

  • Suporta diferentes formatos de dados (Avro, csv, elastic search e Cassandra) e sistemas de armazenamento (HDFS, tabelas HIVE, mysql, etc).

  • Otimização de última geração e geração de código por meio do otimizador Spark SQL Catalyst (estrutura de transformação de árvore).

  • Pode ser facilmente integrado com todas as ferramentas e estruturas de Big Data via Spark-Core.

  • Fornece API para programação Python, Java, Scala e R.

SQLContext

SQLContext é uma classe e é usado para inicializar as funcionalidades do Spark SQL. O objeto de classe SparkContext (sc) é necessário para inicializar o objeto de classe SQLContext.

O seguinte comando é usado para inicializar o SparkContext por meio do spark-shell.

$ spark-shell

Por padrão, o objeto SparkContext é inicializado com o nome sc quando a faísca começa.

Use o seguinte comando para criar SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

Exemplo

Vamos considerar um exemplo de registros de funcionários em um arquivo JSON chamado employee.json. Use os comandos a seguir para criar um DataFrame (df) e ler um documento JSON chamadoemployee.json com o seguinte conteúdo.

employee.json - Coloque este arquivo no diretório onde o atual scala> o ponteiro está localizado.

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Operações de DataFrame

DataFrame fornece uma linguagem específica de domínio para manipulação de dados estruturados. Aqui, incluímos alguns exemplos básicos de processamento de dados estruturados usando DataFrames.

Siga as etapas fornecidas abaixo para realizar operações DataFrame -

Leia o documento JSON

Primeiro, temos que ler o documento JSON. Com base nisso, gere um DataFrame denominado (dfs).

Use o seguinte comando para ler o documento JSON denominado employee.json. Os dados são mostrados como uma tabela com os campos - id, nome e idade.

scala> val dfs = sqlContext.read.json("employee.json")

Output - Os nomes dos campos são retirados automaticamente de employee.json.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Mostrar os dados

Se você quiser ver os dados no DataFrame, use o seguinte comando.

scala> dfs.show()

Output - Você pode ver os dados do funcionário em um formato tabular.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Use o método printSchema

Se você quiser ver a Estrutura (Esquema) do DataFrame, use o seguinte comando.

scala> dfs.printSchema()

Output

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Use Select Method

Use o seguinte comando para buscar name-coluna entre três colunas do DataFrame.

scala> dfs.select("name").show()

Output - Você pode ver os valores do name coluna.

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Usar filtro de idade

Use o seguinte comando para localizar os funcionários com idade superior a 23 (idade> 23).

scala> dfs.filter(dfs("age") > 23).show()

Output

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Use o método groupBy

Use o seguinte comando para contar o número de funcionários que têm a mesma idade.

scala> dfs.groupBy("age").count().show()

Output - dois funcionários têm 23 anos.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Execução de consultas SQL programaticamente

Um SQLContext permite que os aplicativos executem consultas SQL programaticamente enquanto executam funções SQL e retorna o resultado como um DataFrame.

Geralmente, em segundo plano, SparkSQL oferece suporte a dois métodos diferentes para converter RDDs existentes em DataFrames -

Sr. Não Métodos e Descrição
1 Inferindo o esquema usando reflexão

Este método usa reflexão para gerar o esquema de um RDD que contém tipos específicos de objetos.

2 Especificando programaticamente o esquema

O segundo método para criar DataFrame é por meio de uma interface programática que permite construir um esquema e aplicá-lo a um RDD existente.