EDAP非结构化数据入湖:使用pyspark提取pdf元信息下载并写入BOS存储
更新时间:2024-11-27
场景功能
基于非结构化文件在数据湖表中存储的元信息,使用PySpark任务批量拉取文件服务器中的pdf入湖。 本例中数据湖选用(DeltaLake)
数据湖元信息
Deltalake管理表
1.Schema
离线入湖
SDK依赖
执行资源BMR预装bce python sdk
PySpark
import requests
from baidubce.bce_client_configuration import BceClientConfiguration
from baidubce.auth.bce_credentials import BceCredentials
from baidubce import exception
from baidubce.services import bos
from baidubce.services.bos import canned_acl
from baidubce.services.bos.bos_client import BosClient
# bos
bos_host = "bj.bcebos.com"
access_key_id = "${access_key}"
secret_access_key = "${secret_key}"
config = BceClientConfiguration(credentials=BceCredentials(access_key_id, secret_access_key), endpoint = bos_host)
bos_client = BosClient(config)
bucket_name = "${bucket_name}"
object_prefix = "${object_prefix}" # example: /upload/
def process_row(row):
url = row.url
name = row.name
local_filename = name + ".pdf"
response = requests.get(url, stream=True)
response.raise_for_status()
with open(local_filename, 'wb') as file_handle:
for chunk in response.iter_content(chunk_size=8192):
file_handle.write(chunk)
object_key = object_prefix + local_filename
data = open(local_filename, 'rb')
bos_client.put_object_from_file(bucket_name, object_key, local_filename)
bos_location = f"bos://{bucket_name}{object_key}"
return (row.id, row.name, row.url, bos_location)
df = spark.sql("select * from sanling.pdf_files")
processed_rdd = df.rdd.map(process_row)
processed_df = spark.createDataFrame(processed_rdd, df.schema)
processed_df.write.mode("overwrite").insertInto("sanling.pdf_files")
spark.sql("select * from sanling.pdf_files").show()
引擎设置
spark.hadoop.edap.endpoint=http://edap.bj.baidubce.com
spark.hadoop.fs.AbstractFileSystem.bos.impl=org.apache.hadoop.fs.bos.BaiduBosFileSystemAdapter
spark.hadoop.hive.metastore.client.class=com.baidubce.edap.catalog.metastore.EdapCatalogHiveClient
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension