python大資料實踐之一:匯入大批資料檔案到mysql
當有了資料之後,以檔案方式儲存方便從中提取資訊,因此常常將資料檔案儲存到資料庫中。我們透過模擬消費方式產生了10年的消費資料,總共記錄大概300萬行記錄。資料量並不大,但數量多,也就是小檔案多,實際上是不適合使用hdfs來儲存的。因為是儲存在本地客戶端的,為了下一步的應用,我們先將這些資料使用python第三方庫匯入遠端伺服器上的mysql中。
(1)pymysql第三方庫簡介
這是使用python來連線mysql的第三方庫,安裝和使用起來都非常簡單。安裝的時候只需要使用pip工具:
pip install pymysql
安裝完後,使用的時候就在新建py檔案後代碼區寫入:
import pymysql
然後可以使用pymysql物件的connect方法,指定遠端mysql伺服器的ip地址、登入使用者名稱、登入密碼和資料庫名。
clientDB = pymysql。connect(“118。124。174。176”,“root”,“root123”,“dbTest”)
上述clientDB是一個連線物件,呼叫其遊標方法:
cursor = clientDB。cursor()
接下來就可以組裝sql語句,實現CRUD操作,示例如下:
sql = “select * from dbTest。info” #查詢info表中所有記錄
cursor。execute(sql) #傳入sql語句,實現查詢操作
data = cursor。fetall() #將查詢結果物件返回給data列表
print(data) #列印查詢結果
或者實現插入操作:
sql = “insert into info values(1,‘caojianhua’,‘good’)” #查詢info表中所有記錄
cursor。execute(sql) #傳入sql語句,實現新增操作
clientDB。commit() #提交新增操作到伺服器
整個過程可以封裝為一個dbEXE類:
import pymysql
class dbEXE():
def __init__(self): #連線mysql
self。conn = pymysql。connect(“118。124。174。176”,“root”,“root123”,“dbTest”,charset=“utf8”)
self。cursor = self。conn。cursor()
def exeQuery(sql=None): #查詢操作
self。cursor。execute(sql) #傳入sql執行動作
res = self。cursor。fetchall() #取出查詢結果返回給res列表
return res
def exeCUD(sql=None): #刪除、更新、新增操作
self。cursor。execute(sql) #傳入sql執行動作
self。conn。commit() #提交事務處理
print(“執行完畢。。。”)
def exeClose(self): #關閉遊標和連線
self。cursor。close()
self。conn。close()
後面再呼叫時直接傳入sql語句即可完成操作,如查詢操作:
if __name__==‘__main__’:
demo = dbEXE() #例項化
res = demo。exeQuery(sql=“select * from dbTest”) #執行查詢操作
demo。exeClose()
print(res)
(2)使用pymsql載入資料給遠端mysql伺服器
首先我們根據需求在遠端mysql資料庫裡建立好表和欄位屬性:
create table daily_data (date varchar(50),name varchar(50),goods varchar(50),money int);
第一種方法:拼接sql語句,批次執行語句載入
如果是檔案數量少,可以讀取檔案內容然後組裝SQL語句使用insert into語句來插入,如檔案內容如下:
2010-07-06,zhangsan,bean, 4982
2010-07-06,lisi,can, 3390
2010-07-06,zhao,rice, 4438
2010-07-06,zhao,milk, 4411
2010-07-06,liu,milk, 3974
2010-07-06,liu,can, 4143
2010-07-06,zhao,bean, 5630
2010-07-06,topher,can, 4497
2010-07-06,lisi,rice, 3620
2010-07-06,lisi,rice, 3076
2010-07-06,wangwu,can, 1983
2010-07-06,zhangsan,can, 1061
2010-07-06,liu,milk, 3559
2010-07-06,wangwu,bean, 4860
可以看到檔案每行裡四個數值,分隔號為“,”,每行末尾是有換行標記的\n 。那在操作的時候可以先去讀檔案的每一行,然後把一行記錄使用字元逗號分隔,並轉變為列表list。這樣每一行都可以取到4個值,正好對應上述daily_data表中四個欄位,如下操作:
if __name__==‘__main__’:
fileDir = ‘logs’
tableName = ‘daily_data’
ex01 = dbEXE()
with open(file=fileDir + ‘/2010-05-07。log’, mode=‘r’) as f:
for line in f:
item1 = list(line。split(“,”))[0] #獲得每一行第一個數值日期
item2 = list(line。split(“,”))[1] #獲得每一行第二個姓名
item3 = list(line。split(“,”))[2] #獲得每一行第三個商品名
item4 = list(line。split(“,”))[3]。split(‘\n’)[0] #獲得第四個金額
sql = “insert into daily_data values(‘{}’,‘{}’,‘{}’,{})”。format(item1, item2, item3, item4)
ex01。exeCUD(sql) #一行一行的插入到資料庫中
上述實現方式是讀取一行就往資料庫裡插入一行,這樣做速度肯定很慢。按照批次載入的方式,可以先將所有的記錄拼接起來,即:
sql= “insert into tablename value (記錄1) , (記錄2),(記錄3) ”
這樣速度應該會快很多,下面我們將sql拼接來實現一下:
if __name__==‘__main__’:
fileDir = ‘logs’
tableName = ‘daily_data’
ex01 = dbEXE()
with open(file=fileDir + ‘/2010-05-07。log’, mode=‘r’) as f:
strSQL = ‘’
for line in f:
item1 = list(line。split(“,”))[0]
item2 = list(line。split(“,”))[1]
item3 = list(line。split(“,”))[2]
item4 = list(line。split(“,”))[3]。split(‘\n’)[0]
strSQL+=“(‘{}’,‘{}’,‘{}’,{}),”。format(item1,item2,item3,item4) #字串拼接組裝各行記錄
sql=strSQL[:-1] #去除最後一個逗號獲得所有行記錄的拼接
sql = “insert into daily_data values ” + sql #完整的sql語句
ex01。exeCUD(sql=sql)
ex01。exeClose()
上述sql拼接後效果如下:
(‘2010-05-07’,‘bindu’,‘pop’, 2631),(‘2010-05-07’,‘liu’,‘can’, 3709),(‘2010-05-07’,‘bindu’,‘bean’, 2599)
可以來統計一下這樣載入一個檔案1000行記錄所耗時:
if __name__==‘__main__’:
fileDir = ‘logs’
ex01 = dbEXE()
time1 = datetime。now() #記錄起始時間
with open(file=fileDir + ‘/2010-05-07。log’, mode=‘r’) as f:
strSQL = ‘’
for line in f:
item1 = list(line。split(“,”))[0]
item2 = list(line。split(“,”))[1]
item3 = list(line。split(“,”))[2]
item4 = list(line。split(“,”))[3]。split(‘\n’)[0]
strSQL+=“(‘{}’,‘{}’,‘{}’,{}),”。format(item1,item2,item3,item4)
sql=strSQL[:-1]
sql = “insert into daily_data values”+ sql
ex01。exeCUD(sql)
ex01。exeClose()
time2= datetime。now() #記錄結束時間
print(“載入開始時間:”,time1) #列印開始時間
print(“載入結束時間:”,time2) #列印結束時間
最終輸出結果為:
2020-04-20 14:00:01。986527
2020-04-20 14:00:03。019927
耗時1。1秒左右,因為是字串拼接,所以速度還是令人滿意的。如果我們有多個類似的小檔案,我們這樣處理也是可以的,主要耗時也就是字串拼接裡。
如果出現下面的報錯:
執行SQL檔案報錯:1153 - Got a packet bigger than ‘max_allowed_packet’ bytes
原因是mysql預設單個數據包最大為10M,如果單次載入檔案過大,就會報錯。此時可以回到mysql的配置檔案裡,即/etc/mysql/mysql。conf。d/mysqld。cnf編輯這個mysqld。cnf檔案,找到這個引數,將其修改成合適的大小即可:
max_allowed_packet =128M
如果是多個檔案,可以在載入時設定步長來載入,如20個檔案、50個檔案載入一次,也就是按一定步長拼接sql語句,載入完一部分後繼續載入剩下的,這樣速度也非常快:
number = 0 #計數
strSQL= ‘’ #SQL語句初始為空
for list1 in os。listdir(fileDir):
with open(file=fileDir + ‘/’ + list1, mode=‘r’) as f:
for line in f:
item1 = list(line。split(“,”))[0]
item2 = list(line。split(“,”))[1]
item3 = list(line。split(“,”))[2]
item4 = list(line。split(“,”))[3]。split(‘\n’)[0]
strSQL += “(‘{}’,‘{}’,‘{}’,{}),”。format(item1, item2, item3, item4)
number +=1 #計數增1
if number % n ==0: #當計數增至n步長數時
sql = strSQL[:-1] #擷取SQL語句,把最後那個逗號去掉
sql = “insert into goods values ” +sql #拼接為插入SQL語句
self。exeCUD(sql) #執行插入操作
strSQL = ‘’ #執行完成後清空SQL字串,重置為空
print(“the {} files have been loading into mysql, yeah! 。。。”。format(number))
time。sleep(5) #時間設定5秒進行一次
第二種方法:以檔案匯入方式載入
本次實踐共有10年*365估計在3650個檔案,每個檔案接近30k左右資料,均存放在logs目錄下,以日期方式命名,如2010-05-22。log。
由於是採用外部客戶端檔案匯入mysql伺服器,因此在實施的時候有兩個許可權控制的需要注意。
第一個,如果報錯為:
ERROR 1290 (HY000): The MySQL server is running with the ——secure-file-priv option so it cannot execute this statement
就需要在mysql的配置檔案,即/etc/mysql/mysql。conf。d/mysqld。cnf編輯這個mysqld。cnf檔案,新增一行:
secure-file-priv =
因為是遠端傳輸,所以後面空著就行,不用給路徑。
第二個,如果出現如下錯誤:
ERROR 1148: The used command is not allowed with this MySQL version
這是mysql安全設定中的引數,即原則上不允許外部檔案匯入。
此時如果在mysql伺服器端,可以在mysql登入後命令行裡輸入:
mysql> set global local_infile=1;
Query OK, 0 rows affected (0。00 sec)
如果使用客戶端,如pymsql,就需要在連線物件建立時加入這個引數:
client = pymysql。connect(“192。168。58。172”,“root”,“root123”,“supermarket”,local_infile =1)
然後通用在命令列採用匯入檔案方式載入資料,格式為:
load data [low_priority] [local] infile ‘file_name txt’ [replace | ignore]
into table tbl_name
[fields terminated by‘t’] #指定每行中記錄分隔符
[OPTIONALLY] enclosed by ‘’] #指定每個記錄用符號包圍
[escaped by‘\’ ]] #指定跳脫字元便於區分
[lines terminated by‘n’] #指定行與行之間分隔符,一般為換行
[ignore number lines] #可以忽略的行數,如第一行如果為文字檔案頭,可以忽略
[(col_name, )]
load data infile語句從一個文字檔案中以很高的速度讀入一個表中。使用這個命令之前,mysqld程序(服務)必須已經在執行。為了安全原因,當讀取位於伺服器上的文字檔案時,檔案必須處於資料庫目錄或可被所有人讀取。另外,為了對伺服器上檔案使用load data infile,在伺服器主機上你必須有file的許可權。
上述語法中,如果加入關鍵詞low_priority,那麼MySQL將會等到沒有其他人讀這個表的時候,才把插入資料。如:
load data low_priority infile “/logs/demo。txt” into table demo;
如果指定local關鍵詞,則表明從客戶主機讀檔案。如果local沒指定,檔案必須位於伺服器上。
load data infile “/logs/demo。txt” into table demo; #從伺服器上讀檔案
load data local infile “/logs/demo。txt” into table demo; #從客戶機上讀檔案
另外:replace和ignore關鍵詞控制對現有的唯一鍵記錄的重複的處理。如果你指定replace,新行將代替有相同的唯一鍵值的現有行。如果你指定ignore,跳過有唯一鍵的現有行的重複行的輸入。如果你不指定任何一個選項,當找到重複鍵時,出現一個錯誤,並且文字檔案的餘下部分被忽略。
最終完整的常用格式列舉為:
load data infile “/logs/demo。txt” replace into table demo fields terminated by ‘,’ lines terminated by ‘\n’;
有了上面這些介紹為基礎,下面我們對3000多個檔案採用這種方式匯入:
import pymysql,os,time
client = pymysql。connect(“192。168。58。172”,“root”,“root123”,“supermarket”,local_infile =1)
cursor =client。cursor()
fileDir = ‘logs’
for file in os。listdir(fileDir):
sql= “load data local infile ‘” + fileDir + ’/‘ + file + “’ into table daily_income \
fields terminated by ‘,’ \
lines terminated by ‘\n’ ”
cursor。execute(sql)
client。commit()
time。sleep(2)
client。close()
所以對這種小檔案的處理實際上比較繁瑣耗時的,也可以嘗試一下是否可以先將所有檔案內容儲存到同一個檔案中然後來採用檔案匯入方式,應該速度比這種小檔案匯入更快。