Quer compartilhar seu conteúdo em R-bloggers? clique aqui se você tiver um blog, ou aqui se não tiver.
o {sparklyr}
O pacote nos permite conectar e usar o Apache Spark para cálculos de alto desempenho, altamente paralelizados e distribuídos. Também podemos usar os recursos do Spark para melhorar e agilizar nossos pipelines de processamento de dados, pois o Spark oferece suporte à leitura e gravação de muitas fontes populares, como Parquet, Orc, etc. e a maioria dos sistemas de banco de dados por meio de drivers JDBC.
Nesta postagem, exploraremos o uso de R para realizar carregamentos de dados no Spark e, opcionalmente, R de sistemas de gerenciamento de banco de dados relacional como MySQL, Oracle e MS SQL Server e mostraremos como tais processos podem ser simplificados. Também forneceremos código reproduzível por meio de uma imagem Docker, de forma que os leitores interessados possam experimentá-lo facilmente.
Se você estiver interessado apenas na parte de carregamento do Spark, fique à vontade para pular este parágrafo.
Para um exemplo totalmente reproduzível, usaremos uma instância do servidor MySQL local, pois devido à sua natureza de código aberto, é muito acessível. Vamos usar o {DBI}
e {RMySQL}
pacotes para se conectar ao servidor diretamente de R e preencher um banco de dados com dados fornecidos pelo {nycflights13}
pacote que usaremos mais tarde para nossas cargas Spark.
Vamos escrever o flights
quadro de dados no banco de dados MySQL usando {DBI}
e chamar a tabela recém-criada test_table
:
test_df
Agora temos nossa mesa disponível e podemos nos concentrar na parte principal do artigo.
Contents
- 1 Obter um driver JDBC e usá-lo com Spark e sparklyr
- 2 Recuperando dados de um banco de dados com sparklyr
- 3 Configurando o options argumento de spark_read_jdbc()
- 4 Carregando uma tabela de banco de dados específica
- 5 Executar uma consulta
- 6 Oráculo
- 7 MS SQL Server
- 8 Ainda mais sistemas RDBM
- 9 o memory argumento
- 10 Particionamento
Obter um driver JDBC e usá-lo com Spark e sparklyr
Como o Spark é executado por meio de um JVM, a maneira natural de estabelecer conexões com sistemas de banco de dados é usando Java Database Connectivity (JDBC). Para fazer isso, precisaremos de um driver JDBC que nos permitirá interagir com o sistema de banco de dados de nossa escolha. Para este exemplo, estamos usando MySQL, mas forneceremos detalhes sobre outro RDBMS posteriormente neste artigo.
Baixar e extrair o jar do conector
Com um pouco de pesquisa online, podemos baixar o driver e extrair o conteúdo do arquivo zip:
mkdir -p $HOME/jars
wget -q -t 3
-O $HOME/jars/mysql-connector.zip
https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.21.zip
unzip -q -o
-d $HOME/jars
$HOME/jars/mysql-connector.zip
Agora, o arquivo no qual estamos mais interessados em nosso caso de uso, .jar
arquivo que contém as classes necessárias para estabelecer a conexão. Usando R, podemos localizar o (s) arquivo (s) jar extraído (s), por exemplo, usando o dir()
função:
jars
## [1] "mysql-connector-java-8.0.21.jar"
Conectando usando o jar
Em seguida, precisamos contar {sparklyr}
para usar esse recurso ao estabelecer uma conexão Spark, por exemplo, adicionando um sparklyr.jars.default
elemento com os caminhos para os arquivos jar necessários para o config
lista e, finalmente, estabelecer a conexão Spark usando nosso config
:
config
Recuperando dados de um banco de dados com sparklyr
Com a conexão Spark estabelecida, podemos nos conectar ao nosso banco de dados MySQL do Spark e recuperar os dados. {sparklyr}
fornece um prático spark_read_jdbc()
função para este propósito exato. A API mapeia de perto para a API Scala, mas não é muito explícita em como configurar a conexão. A chave aqui é o options
argumento para spark_read_jdbc()
, que especificará todos os detalhes de conexão de que precisamos.
Configurando o options
argumento de spark_read_jdbc()
Primeiro, vamos criar um jdbcConnectionOpts
lista com as propriedades básicas de conexão. Estes são o URL de conexão e o driver. Abaixo, também especificamos explicitamente o user
e password
, mas geralmente também podem ser fornecidos como parte do URL:
# Connection options
jdbcConnectionOpts
A última informação que precisamos fornecer é a identificação dos dados que queremos extrair assim que a conexão for estabelecida. Para isso, podemos usar uma das duas opções:
dbtable
– no caso de desejarmos criar um Spark DataFrame extraindo o conteúdo de uma tabela específicaquery
– caso desejemos criar um Spark DataFrame executando uma consulta SQL
Carregando uma tabela de banco de dados específica
Primeiro vamos com a opção de carregar uma tabela de banco de dados que preenchemos com os voos anteriores e nomeada test_table
, juntando tudo e carregando os dados usando spark_read_jdbc()
:
# Other options specific to the action
jdbcDataOpts
## # Source: spark [?? x 20]
## row_names year month day dep_time sched_dep_time dep_delay arr_time
##
## 1 1 2013 1 1 517 515 2 830
## 2 2 2013 1 1 533 529 4 850
## 3 3 2013 1 1 542 540 2 923
## 4 4 2013 1 1 544 545 -1 1004
## 5 5 2013 1 1 554 600 -6 812
## 6 6 2013 1 1 554 558 -4 740
## 7 7 2013 1 1 555 600 -5 913
## 8 8 2013 1 1 557 600 -3 709
## 9 9 2013 1 1 557 600 -3 838
## 10 10 2013 1 1 558 600 -2 753
## # … with more rows, and 12 more variables: sched_arr_time ,
## # arr_delay , carrier , flight , tailnum ,
## # origin , dest , air_time , distance , hour ,
## # minute , time_hour
Fornecemos os seguintes argumentos:
sc
é a conexão Spark que estabelecemos usando a configuração que inclui os jars necessáriosname
é uma string de caracteres com o nome a ser atribuído à tabela recém-gerada no Spark SQL, não o nome da tabela de origem que queremos ler de nosso banco de dadosoptions
é uma lista com as opções de conexão e as opções relacionadas aos dados, por isso usamosappend()
para combinar ojdbcConnectionOpts
ejdbcDataOpts
listas em umamemory
é uma lógica que informa ao Spark se queremos armazenar a tabela em cache na memória. Um pouco mais sobre isso e algumas implicações de desempenho abaixo
Executar uma consulta
Mencionamos acima que, além de apenas carregar uma tabela, também podemos escolher executar uma consulta SQL e usar seu resultado como fonte para nosso Spark DtaFrame. Aqui está um exemplo simples disso.
# Use `query` instead of `dbtable`
jdbcDataOpts
## # Source: spark [?? x 20]
## row_names year month day dep_time sched_dep_time dep_delay arr_time
##
## 1 1 2013 1 1 517 515 2 830
## 2 6570 2013 1 8 1435 1440 -5 1717
## 3 7111 2013 1 9 717 700 17 812
## 4 7349 2013 1 9 1143 1144 -1 1425
## 5 10593 2013 1 13 835 824 11 1030
## 6 13775 2013 1 16 1829 1730 59 2117
## 7 18967 2013 1 22 1902 1808 54 2214
## 8 19417 2013 1 23 1050 1056 -6 1143
## 9 19648 2013 1 23 1533 1529 4 1641
## 10 21046 2013 1 25 724 720 4 1000
## # … with more rows, and 12 more variables: sched_arr_time ,
## # arr_delay , carrier , flight , tailnum ,
## # origin , dest , air_time , distance , hour ,
## # minute , time_hour
Observe que o único elemento que mudou é o
jdbcDataOpts
lista, que agora contém umquery
elemento em vez de umdbtable
elemento.
Nosso exemplo de brinquedo com MySQL funcionou bem, mas na prática, podemos precisar acessar dados em outros sistemas RDBM populares, como Oracle, MS SQL Server e outros. O padrão que mostramos acima, entretanto, permanece, já que o design da API é o mesmo, independentemente do sistema em questão.
Em geral, precisaremos de 3 elementos para conectar com sucesso:
- Um driver JDBC especificado e os recursos fornecidos para
{sparklyr}
noconfig
argumento despark_connect()
, geralmente na forma de caminhos para arquivos .jar contendo os recursos necessários - Um URL de conexão que dependerá do sistema e de outras especificações de configuração
- Por último, mas não menos importante, todos os pré-requisitos técnicos e de infraestrutura, como credenciais com os direitos de acesso adequados, o host acessível a partir do cluster Spark, etc.
Agora, alguns exemplos com os quais trabalhamos no passado e com os quais tivemos sucesso.
Oráculo
Driver Oracle JDBC
Os drivers podem ser baixados (após o login) do site da Oracle e o nome do driver geralmente é "oracle.jdbc.driver.OracleDriver"
. Certifique-se de usar a versão apropriada.
Usando identificação de host totalmente qualificada
hostName
Usando tnsnames.ora
o tnsnames.ora
file é um arquivo de configuração que contém nomes de serviço de rede mapeados para conectar descritores para o método de nomenclatura local ou nomes de serviço de rede mapeados para endereços de protocolo de ouvinte. Com isso implementado, podemos usar apenas o nome do serviço em vez de host, porta e identificação de serviço totalmente qualificados, por exemplo:
serviceName
Analisando tipos de dados especiais
Observe que o driver JDBC sozinho pode não ser suficiente para analisar todos os tipos de dados em um banco de dados Oracle. Por exemplo, analisar o XMLType
muito provavelmente exigirá xmlparserv2.jar
, e xdb.jar
junto com o adequado ojdbc*.jar
.
MS SQL Server
Driver JDBC do MS SQL Server
Os drivers para diferentes versões do JRE podem ser baixados do site Download Microsoft JDBC Driver para SQL Server. Novamente, certifique-se de que a versão do JRE corresponda à que você usa em seus ambientes.
Opções de conexão do MS SQL Server
serverName
Ainda mais sistemas RDBM

Logotipos de R, sparklyr, Spark e sistemas RDBMS selecionados
Vlad Mihalcea escreveu um artigo muito útil sobre strings de URL de conexão do driver JDBC que contém os detalhes da URL de conexão para vários outros sistemas de banco de dados comuns.
o memory
argumento
o memory
argumento para spark_read_jdbc()
pode ser muito importante quando o desempenho é de interesse. O que acontece ao usar o padrão memory = TRUE
é que a tabela no contexto do Spark SQL é armazenada em cache usando CACHE TABLE
e um SELECT count(*) FROM
a consulta é executada na tabela em cache. Isso força o Spark a executar a ação de carregar a tabela inteira na memória.
Dependendo do nosso caso de uso, pode ser muito mais benéfico usar memory = FALSE
e apenas armazenar em cache na memória Spark as partes da tabela (ou resultados processados) de que precisamos, já que as operações mais dispendiosas geralmente são transferências de dados pela rede. Transferir o mínimo de dados possível do banco de dados para a memória Spark pode trazer benefícios de desempenho significativos.
Isso é um pouco difícil de mostrar com nosso exemplo de brinquedo, já que tudo está acontecendo fisicamente dentro do mesmo contêiner (e, portanto, o mesmo sistema de arquivos), mas diferenças podem ser observadas mesmo com esta configuração e nosso pequeno conjunto de dados:
microbenchmark::microbenchmark(
times = 10,
setup = {
library(dplyr)
library(sparklyr)
sparklyr::spark_disconnect_all()
sc %
filter(tailnum == "N14228") %>%
select(tailnum, distance) %>%
compute("test")
},
# with memory=FALSE
lazy = {
two %
filter(tailnum == "N14228") %>%
select(tailnum, distance) %>%
compute("test")
}
)
# Unit: seconds
# expr min lq mean median uq max neval
# eager 15.460844 16.24838 17.07560 17.03592 17.88299 18.73005 10
# lazy 9.821039 10.12435 10.40718 10.42766 10.70024 10.97283 10
Vemos que a abordagem “preguiçosa”, que não armazena em cache a tabela inteira na memória, produziu o resultado cerca de 41% mais rápido. Obviamente, isso não é um benchmark relevante para cargas de dados da vida real, mas pode fornecer algumas dicas sobre como otimizar as cargas.
Particionamento
Particionar os dados pode trazer um aumento de desempenho muito significativo e veremos como configurá-lo e otimizá-lo em detalhes em um artigo separado.
Se você tiver Docker disponível, executar o seguinte deve gerar um contêiner Docker com RStudio Server exposto na porta 8787, para que você possa abrir seu navegador da web em http://localhost:8787
para acessá-lo e experimentar o código. O nome do usuário é rstudio
e a senha é a que você escolher abaixo:
docker run -d -p 8787:8787 -e PASSWORD=pass jozefhajnala/jozefio
Relacionados