sqlhelper.py 4.9 KB
import asyncio
import os
import threading

from sqlalchemy import Column, Integer, String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import SingletonThreadPool

from conf.settings import user_db_path

Base = declarative_base()

engine = create_engine('sqlite:///{}'.format(os.path.join(user_db_path, "userlist.db")),
                       poolclass=SingletonThreadPool,
                       connect_args={'check_same_thread': False})

Session = sessionmaker(bind=engine)
session = Session()


class Status():
    OFFLINE = 0
    ONLINE = 1
    LOGGINE = 2
    FAILED = 3


class Config(Base):
    __tablename__ = 'config'
    id = Column(Integer, primary_key=True, autoincrement=True)
    key = Column(String(50), index=True, nullable=False, unique=True)
    value = Column(String(256))

    @staticmethod
    def get(key, value_func=None):
        conf = session.query(Config).filter_by(key=key).first()
        if not conf:
            if value_func:
                conf = Config(key=key, value=value_func())
                session.add(conf)
                session.commit()
                return conf.value
            return None
        return conf.value

    @staticmethod
    def set(key, value):
        conf = session.query(Config).filter_by(key=key).first()
        if not conf:
            conf = Config(key=key, value=value)
            session.add(conf)
            session.commit()
            return conf
        else:
            conf.value = value
            session.commit()
        return conf

    def __repr__(self):
        return "Config(key={}, value={})".format(self.key, self.value)


class UserList(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True)
    email = Column(String(50), index=True, nullable=False, unique=True)
    password = Column(String(50), nullable=False)
    cookie = Column(String(512))
    user_agent = Column(String(256))
    fbid = Column(String(20), index=True)
    status = Column(Integer, default=0, nullable=False, index=True)

    # proxy = Column(String(256))

    def __repr__(self):
        return "User(id={}, email={}, password={}, cookie={}, fbid={}, status={})" \
            .format(self.id, self.email, self.password, len(self.cookie) if self.cookie else None, self.fbid,
                    self.status)

    def format_cookie(self):
        if self.cookie:
            return dict([tuple(sub.split("=")) for sub in self.cookie.split('; ')])
        else:
            return {}

    def set(self, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)
        session.commit()
        return self

    @staticmethod
    def update(**kwargs):
        unique = tuple(kwargs)[0]
        if unique == 'email':
            user_ = session.query(UserList).filter_by(email=kwargs.get(unique)).first()
        elif unique == 'fbid':
            user_ = session.query(UserList).filter_by(fbid=kwargs.get(unique)).first()
        else:
            raise BaseException("条件不对")
        kwargs.pop(unique)
        for k, v in kwargs.items():
            setattr(user_, k, v)
        session.commit()
        return user_

    @staticmethod
    def insert(**kwargs):
        u = UserList(**kwargs)
        session.add(u)
        session.commit()
        return u

    @staticmethod
    def get(**kwargs):
        unique = tuple(kwargs)[0]
        if unique == 'email':
            user_ = session.query(UserList).filter_by(email=kwargs.get(unique)).first()
        elif unique == 'fbid':
            user_ = session.query(UserList).filter_by(fbid=kwargs.get(unique)).first()
        else:
            raise BaseException("条件不对")

        return user_

    @staticmethod
    def remove(**kwargs):
        unique = tuple(kwargs)[0]
        if unique == 'email':
            user_ = session.query(UserList).filter_by(email=kwargs.get(unique)).first()
        elif unique == 'fbid':
            user_ = session.query(UserList).filter_by(fbid=kwargs.get(unique)).first()
        else:
            raise BaseException("条件不对")
        if user_:
            session.delete(user_)
            session.commit()
        else:
            raise BaseException("用户不存在")

    @staticmethod
    def all() -> list:
        users = session.query(UserList).all()
        return users

    @staticmethod
    def query(**kwargs) -> list:
        users = session.query(UserList).filter_by(**kwargs).all()
        return users


# Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

if __name__ == '__main__':
    print(UserList.all())
    # def tes(email):
    #     u = UserList.get(email=email)
    #     u.set(status=3)
    #     print(u)
    # threading.Thread(target=tes, args=('[email protected]',)).start()
    # threading.Thread(target=tes, args=('[email protected]',)).start()
    pass