- 论坛徽章:
- 0
|
对Mysql connector代码做了封装,可以直接插入dict对象
[Python]代码- #!/usr/bin/env python
- #-*- coding:utf-8-*-
-
- from mysql import connector
-
- def connect():
- config={
- 'host':'localhost',#默认127.0.0.1
- 'user':'root',
- 'password':'1111111',
- 'port':3306 ,#默认即为3306
- 'database':'test',
- 'charset':'utf8'#默认即为utf8
- }
- try:
- return connector.connect(**config)
- except connector.Error as e:
- print('connect fail! {}'.format(e))
-
- def execSql(cur, sql, data, isCommit = False):
- cur.executemany(sql, data) if isinstance(data, list) else cur.execute(sql, data)
- if isCommit:
- conn.commit()
-
- def insert(conn, tableName, datas):
- sql = 'insert into `{0}` ({1}) values ({2})'
- keys = []
- placeHolders = []
-
- data = datas
- if isinstance(datas, list):
- data = datas[0]
-
- for key in data:
- keys.append(key)
- placeHolders.append('%('+key+')s')
- sql = sql.format(tableName, ','.join(keys), ','.join(placeHolders))
-
- id = -1
- try:
- cur = conn.cursor()
- execSql(cur, sql, datas)
- if not isinstance(datas, list):
- id = cur.lastrowid #返回主键
- conn.commit()
- except connector.Error as e:
- raise e
- finally:
- cur.close()
- return id
-
- def query(conn, sql, datas):
- try:
- cur = conn.cursor()
- cur.execute(sql, datas)
- except connector.Error as e:
- raise e
- return cur
-
- if __name__ == '__main__':
- testDict = {
- 'aa': 1,
- 'bb': 'aaa',
- 'cc': 'bbb',
- 'dd': 'ccc'
- }
- conn = connect()
- print insert(conn, 'test_table', brandDict)
复制代码 |
|