Chinaunix
标题:
sqlalchemy的session
[打印本页]
作者:
mseaspring
时间:
2010-02-07 17:48
标题:
sqlalchemy的session
#! /usr/bin/env python#coding=utf-8#sqlalchemy的session用法
from sqlalchemy import *from sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) password = Column(String)
def __init__(self,name, fullname, password): self.name = name self.fullname = fullname self.password = password def __repr__(self): return "" % (self.name, self.fullname, self.password)
engine = create_engine("sqlite:///test.db",echo=True)
Session = sessionmaker(bind=engine)session = Session()#也可以通过Session.configure(bind=engine)实现ed_user = User('ed', 'Ed Jones', 'edspassword')
#此时数据还未添加,直到执行flush或query时候session.add(ed_user)
session.add_all([ User('wendy', 'Wendy Williams', 'foobar'), User('mary', 'Mary Contrary', 'xxg527'), User('fred', 'Fred Flinstone', 'blah')])
#查询第一个符合条件的our_user = session.query(User).filter_by(name='ed').first() session.query(User).filter(User.name.in_(['mary', 'wendy'])).all()
print our_user.fullnamesession.commit()
for instance in session.query(User).order_by(User.id): print instance.name, instance.fullname
for name, fullname in session.query(User.name, User.fullname): print name,fullname
for row in session.query(User, User.name).all(): print row.User,row.name for u in session.query(User).order_by(User.id)[1:3]: print u
for name, in session.query(User.name).filter_by(fullname='Ed Jones'): print name
for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'): print user
#使用绑定变量形式#session.query(User).filter("id
#使用整个sql语句#session.query(User).from_statement("SELECT * FROM users where name=:name").params(name='ed').all()
#使用位列#session.query("id", "name", "thenumber12").from_statement("SELECT id, name, 12 as thenumber12 FROM users where name=:name").params(name='ed').all()
#查询个数#session.query(User).filter(User.name.like('%ed')).count()
#from sqlalchemy import func#session.query(func.count(User.name), User.name).group_by(User.name).all()
from sqlalchemy import ForeignKeyfrom sqlalchemy.orm import relation, backref
class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer, ForeignKey('users.id'))
user = relation(User, backref=backref('addresses', order_by=id))
def __init__(self, email_address): self.email_address = email_address
def __repr__(self): return "" % self.email_address
metadata = Base.metadata
metadata.create_all(engine)
jack = User('jack', 'Jack Bean', 'gjffdd')jack.addresses = [Address(email_address='jack@google.com'), Address(email_address='j25@yahoo.com')]session.add(jack)session.commit()
jack = session.query(User).filter_by(name='jack').one() print jack.addresses
#连接查询for u, a in session.query(User, Address).filter(User.id==Address.user_id).\ filter(Address.email_address=='jack@google.com').all(): print u,a #from sqlalchemy.orm import join#session.query(User).select_from(join(User, Address)).\# filter(Address.email_address=='jack@google.com').all()
#子查询stmt = session.query(Address).filter(Address.email_address != 'j25@yahoo.com').subquery()adalias = aliased(Address, stmt)for user, address in session.query(User, adalias).join((adalias, User.addresses)): print user,address
#查询from sqlalchemy.sql import existsstmt = exists().where(Address.user_id==User.id)for name, in session.query(User.name).filter(stmt): print name
session.delete(jack)session.query(User).filte_by(name='jack').count()#from sqlalchemy.orm import clear_mappers#clear_mappers()
本文来自ChinaUnix博客,如果查看原文请点:
http://blog.chinaunix.net/u1/55091/showart_2179499.html
欢迎光临 Chinaunix (http://bbs.chinaunix.net/)
Powered by Discuz! X3.2