1. 数据库读写
数据库是最常用的通过socket连接的软件,多数时候我们写服务,做分析写算法的数据来源都来自数据库,而结果也往往需要放入数据库.最常见的数据库是关系数据库,像标准库自带的sqlite,常见的postgresql,mysql就是关系数据库他们使用统一的操作语言SQL语言
进行操作,但不同的数据库对SQL语言的支持并不完全一样.而像hive这样的实现了部分SQL语句的数仓也可以看做是这类数据库的一个扩展.
另一类是非关系数据库,那就比较多样了,比较常见的大致3类:
- 以redis为代表的键值数据库
- 以mongodb为代表文档数据库
- 以influxdb为代表的时间序列数库
- 以neo4j为代表的图数据库
这些数据库一般并不通用,而是在特定情境下有较大作用,我会介绍我用过的,没用过的也就不介绍了.
1.1. 关系数据库
关系数据库本身接口几乎是一致的,这边以postgresql
为例介绍,本文测试的pg使用docker部署.
1.1.1. 同步接口的关系数据库
常见的同步接口关系数据库如下:
对应数据库 | 包 |
---|---|
sqlite | sqlite3标准库 |
postgresql | psycopg2 |
mysql | pymysql |
mssql | pymssql |
hive | pyhive |
同步接口的关系数据库都是差不多的使用方法
- 先创建连接
- 创建一个游标
- 使用游标对象的
.execute(sql)
接口写入SQL语句 - 使用连接对象的
.commit()
接口提交sql语句 - 使用游标对象的
.fetchall()
接口获取结果 - 使用连接对象的
.close()
方法关闭连接
import psycopg2 dsn = "host=localhost port=5432 dbname=test user=postgres password=postgres" sql = ''' SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company';''' with psycopg2.connect(dsn) as conn: c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS company (id INT PRIMARY KEY NOT NULL, name TEXT NOT NULL, age INT NOT NULL, address CHAR(50), salary REAL);''') conn.commit() c.execute(sql) result = c.fetchall() print(result)
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
使用peewee做orm
orm是工程上常用的关系型数据库使用方式,使用orm可以让数据库访问这个动作面相对象,获得的数据以及针对数据的操作更加直观,但同时因为毕竟是一层包装,所以也会损失一些性能,而且因为是面向对象操作所以牺牲了灵活性.因此比较适合在业务逻辑上使用(OLTP),对于数据处理的场景(OLAP),
通常我个人比较喜欢使用peewee这个orm.我常用的特性有:
- 使用数据库的url访问数据库
- 在未知数据库路径配置的情况下使用代理对象建立映射
- 在未知表结构只知道表名的情况下获取表对象
- 在未知表是否存在的情况下安全的建表
- 使用上下文语法定义事务
- 使用迭代器访问多条数据
peewee支持的数据有:
- mysql
- postgresql
- sqlite
使用playhouse.db_url.connect
的schema可以是:
- apsw: APSWDatabase
- mysql: MySQLDatabase
- mysql+pool: PooledMySQLDatabase
- postgres: PostgresqlDatabase
- postgres+pool: PooledPostgresqlDatabase
- postgresext: PostgresqlExtDatabase
- postgresext+pool: PooledPostgresqlExtDatabase
- sqlite: SqliteDatabase
- sqliteext: SqliteExtDatabase
- sqlite+pool: PooledSqliteDatabase
- sqliteext+pool: PooledSqliteExtDatabase
定义表对象
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
连接数据库
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
使用sql语句
with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company'; """) as cursor: result = cursor.fetchall() print(result)
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
映射Person并创建表
db.initialize(database) db.create_tables([Person],safe=True) with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='person'; """) as cursor: result = cursor.fetchall() print(result)
[('id', 'person', 'integer'), ('name', 'person', 'character varying'), ('birthday', 'person', 'date')]
未知表结构的情况下获取表对象
from playhouse.reflection import generate_models
COMPANY = generate_models(database).get("company")
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
0
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
1
插入多条数据
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
2
读取多条数据
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
3
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
4
1.1.2. 异步接口的关系数据库
常见的异步接口关系数据库如下:
对应数据库 | 包 |
---|---|
sqlite | aiosqlite |
postgresql | aiopg |
mysql | aiomysql |
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
5
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
使用peewee_async将peewee变成异步orm
peewee是基于同步接口的,而异步语法具有传染性,如果使用peewee就会阻塞,好在有一个包peewee_async为我们做好了将其异步化的工作,需要注意的是目前这个包默认安装使用的是peewee 2,而要使用peewee3需要指定版本安装,0.6.0a
是一个可以使用的版本
这个包支持的数据库有:
- mysql
- postgresql
使用playhouse.db_url.connect
的schema可以是:
postgres+async
postgres+pool+async
mysql+async
mysql+pool+async
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
7
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
8
[('id', 'company', 'integer'), ('name', 'company', 'text'), ('age', 'company', 'integer'), ('address', 'company', 'character'), ('salary', 'company', 'real')]
9
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
0
peewee-async的一处bug
至少在在0.6.0a
版本peewee-async
有一处bug,就是无法设置connect_timeout
这个参数无法设置,我们可以为其打个猴子补丁
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
1
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
2
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
3
1.2. 键值对内存数据库Redis
除了传统关系型数据库,业务上最常见的恐怕就是redis了.redis实际上分为两种:
- 单机模式 其默认端口为6379
- 集群模式
这两者在使用上并不完全一样,集群模式无法使用需要全局扫key的操作,比如keys这种.
Redis的命令很多这边不做过多介绍,可以看官方文档.redis支持5种数据结构
- 字符串
- 列表
- 哈希表(python中的字典)
- 集合
- 有序集合
他们具体的操作可以看这个文档
redis因为其带着数据结构所以有不少邪道用法,具体的可以看我的这篇博客
1.2.1. Redis的同步接口
单机版本Redis使用包redis-py来连接,它自带一个连接池.需要注意的是从redis中取出的值时bytes类型
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
4
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
5
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
6
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
7
集群版本的redis需要使用redis-py-cluster来访问,需要注意的是目前它依赖于2.0版本的redis-py
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
8
1.2.2. Redis异步接口
在异步接口方面redis有两个比较好的包:
aioredis 用的最多的一个包,但目前只支持单机redis
aredis 一个用C包
aredis
封装的异步redis客户端,接口很丰富性能也强,作者是个国人,支持单机redis和集群,但用的人相对少而且由于是个人开发所以更新不算频繁
aioredis
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
9
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
0
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
7
aredis
aredis使用StrictRedis
类连接单机redis,使用StrictRedisCluster
连接redis集群,其他的操作都是一样的
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
2
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
0
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
7
1.3. 时间序列数库influxdb
influxdb是目前最流行的时间序列数据库,它支持类似sql语言InfluxQL的特殊语法进行操作,也可以使用http接口发起请求,因此简单好用.
influxdb默认端口为8086
influxdb的同步接口
influxdb同步接口可以使用包influxdb
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
5
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
6
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
7
influxdb的异步接口
异步接口使用aioinflux它其实只是封装了influxdb的RESTful接口.但个人认为用起来更好用
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
8
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
9
from playhouse.db_url import connect database = connect("postgres+pool://postgres:postgres@localhost:5432/test")
6
with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company'; """) as cursor: result = cursor.fetchall() print(result)
1
1.4. 图数据库ArangoDB
arangodb是一个开源的图数据库,它支持一种类似SQL的语法AQL同时也可以使用RESTful接口请求.
ArangoDB默认端口为8529
,自带一个相当美观好用的web服务,我们可以在其上进行很多操作.
arangodb的同步接口
arangodb只有封装好的同步接口python-arango
with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company'; """) as cursor: result = cursor.fetchall() print(result)
2
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
5
arangodb可以像一般文档数据库一样使用
with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company'; """) as cursor: result = cursor.fetchall() print(result)
4
with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company'; """) as cursor: result = cursor.fetchall() print(result)
5
with database: # with samh.execute_sql("DESC cartoon") as cursor: # scheme = cursor.fetchall() #names = [i[0] for i in scheme] with database.execute_sql(""" SELECT column_name, table_name, data_type FROM information_schema.columns WHERE table_schema='public' and table_name='company'; """) as cursor: result = cursor.fetchall() print(result)
6
from peewee import Proxy,Model,CharField,DateField db = Proxy() class Person(Model): name = CharField() birthday = DateField() class Meta: database = db # This model uses the "people.db" database.
5
还没有评论,来说两句吧...