日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python之路_Day13

發布時間:2024/9/5 python 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python之路_Day13 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Python之路_Day13_課堂筆記
前期回顧一、redis發布訂閱
二、rabbitMQ原始隊列exchangeex全部轉發ex,關鍵字ex,模糊匹配rpc
三、MySQL
四、Python MySQLpymysqlexcute ? ?執行單條語句,返回受影響的行數excutemany ? ?執行多條語句,返回受影響的行數fetchonefetchallfetchmanyscrolllastrowid
五、SQLAlchemyORM框架db firstcode first====> 我們以后通過類和對象操作數據庫code first1、自定義生成表2、使用類操作表

本節摘要一、ORM
連表一對多多對多二、Paramiko模塊鏈接:堡壘機三、前端HTML
http://www.cnblogs.com/wupeiqi/articles/5699254.html

一、ORM—SQLAlchemy
連表一對多1、創建表,主動知道外鍵2、操作:類:repr單表連表session.query(表1).join(表2).all()
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • from sqlalchemy import create_engine
  • from sqlalchemy.ext.declarative import declarative_base
  • from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
  • from sqlalchemy.orm import sessionmaker, relationship
  • engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)
  • Base = declarative_base()
  • class Test(Base):
  • __tablename__ = 'test'
  • nid = Column(Integer,primary_key=True,autoincrement=True)
  • name = Column(String(32))
  • class Group(Base):
  • __tablename__ = 'group'
  • nid = Column(Integer,primary_key=True,autoincrement=True)
  • caption = Column(String(32))
  • class User(Base):
  • __tablename__ = 'user'
  • nid = Column(Integer,primary_key=True,autoincrement=True)
  • username = Column(String(32))
  • group_id = Column(Integer,ForeignKey('group.nid'))
  • def __repr__(self):
  • temp = "%s - %s : %s" %(self.nid,self.username,self.group_id)
  • return temp
  • def init_db():
  • Base.metadata.create_all(engine)
  • def drop_db():
  • Base.metadata.drop_all(engine)
  • # init_db()
  • Session = sessionmaker(bind=engine)
  • session = Session()
  • # session.add(Group(caption='dba'))
  • # session.add(Group(caption='ddd'))
  • # session.commit()
  • # session.add_all([
  • # User(username='alex1',group_id=1),
  • # User(username='alex2',group_id=2)
  • # ])
  • # session.commit()
  • # 只是獲取用戶
  • # ret = session.query(User).filter(User.username == 'alex1').all()
  • # print(ret)
  • # ret = session.query(User).all()
  • # obj = ret[0]
  • # print(ret)
  • # print(obj)
  • # print(obj.nid)
  • # print(obj.username)
  • # print(obj.group_id)
  • # ret = session.query(User.username).all()
  • # print(ret)
  • sql = session.query(User,Group).join(Group, isouter=True)
  • print(sql)
  • ret = session.query(User,Group).join(Group, isouter=True).all()
  • print(ret)
  • # select * from user left join group on user.group_id = group.nid
  • 正反向查找
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • from sqlalchemy import create_engine
  • from sqlalchemy.ext.declarative import declarative_base
  • from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
  • from sqlalchemy.orm import sessionmaker, relationship
  • engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)
  • Base = declarative_base()
  • class Test(Base):
  • __tablename__ = 'test'
  • nid = Column(Integer,primary_key=True,autoincrement=True)
  • name = Column(String(32))
  • class Group(Base):
  • __tablename__ = 'group'
  • nid = Column(Integer,primary_key=True,autoincrement=True)
  • caption = Column(String(32))
  • class User(Base):
  • __tablename__ = 'user'
  • nid = Column(Integer,primary_key=True,autoincrement=True)
  • username = Column(String(32))
  • group_id = Column(Integer,ForeignKey('group.nid'))
  • group = relationship('Group',backref='uuu')
  • def __repr__(self):
  • temp = "%s - %s : %s" %(self.nid,self.username,self.group_id)
  • return temp
  • def init_db():
  • Base.metadata.create_all(engine)
  • def drop_db():
  • Base.metadata.drop_all(engine)
  • # init_db()
  • Session = sessionmaker(bind=engine)
  • session = Session()
  • # session.add(Group(caption='dba'))
  • # session.add(Group(caption='ddd'))
  • # session.commit()
  • # session.add_all([
  • # User(username='alex1',group_id=1),
  • # User(username='alex2',group_id=2)
  • # ])
  • # session.commit()
  • # 只是獲取用戶
  • # ret = session.query(User).filter(User.username == 'alex1').all()
  • # print(ret)
  • # ret = session.query(User).all()
  • # obj = ret[0]
  • # print(ret)
  • # print(obj)
  • # print(obj.nid)
  • # print(obj.username)
  • # print(obj.group_id)
  • # ret = session.query(User.username).all()
  • # print(ret)
  • # sql = session.query(User,Group).join(Group, isouter=True)
  • # print(sql)
  • # ret = session.query(User,Group).join(Group, isouter=True).all()
  • # print(ret)
  • # select * from user left join group on user.group_id = group.nid
  • # 原始方式
  • # ret = session.query(User.username,Group.caption).join(Group, isouter=True).all()
  • # 新方式(正向查詢)
  • # ret = session.query(User).all()
  • # for obj in ret:
  • # obj代指user表的每一行數據
  • # obj.group代指group對象
  • # print(obj.nid,obj.username,obj.group_id,obj.group,obj.group.nid,obj.group.caption)
  • # 原始方式
  • # ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == 'DBA').all()
  • # 新方式(反向查詢)
  • # obj = session.query(Group).filter(Group.caption == 'DBA').first()
  • # print(obj.nid)
  • # print(obj.caption)
  • # print(obj.uuu)

  • 多對多:1、創建表——額外的關系表2、filter()==in_(都可以是另外一個查詢)
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • from sqlalchemy import create_engine
  • from sqlalchemy.ext.declarative import declarative_base
  • from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
  • from sqlalchemy.orm import sessionmaker, relationship
  • engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)
  • Base = declarative_base()
  • ##############多對多######################
  • class Host(Base):
  • __tablename__ = 'host'
  • nid = Column(Integer, primary_key=True,autoincrement=True)
  • hostname = Column(String(32))
  • port = Column(String(32))
  • ip = Column(String(32))
  • class HostUser(Base):
  • __tablename__ = 'host_user'
  • nid = Column(Integer, primary_key=True,autoincrement=True)
  • username = Column(String(32))
  • class HostToHostUser(Base):
  • __tablename__ = 'host_to_host_user'
  • nid = Column(Integer, primary_key=True,autoincrement=True)
  • host_id = Column(Integer,ForeignKey('host.nid'))
  • host_user_id = Column(Integer,ForeignKey('host_user.nid'))
  • def init_db():
  • Base.metadata.create_all(engine)
  • def drop_db():
  • Base.metadata.drop_all(engine)
  • # init_db() # 創建表
  • Session = sessionmaker(bind=engine)
  • session = Session()
  • # session.add_all([
  • # Host(hostname='c1',port='22',ip='1.1.1.1'),
  • # Host(hostname='c2',port='22',ip='1.1.1.2'),
  • # Host(hostname='c3',port='22',ip='1.1.1.3'),
  • # Host(hostname='c4',port='22',ip='1.1.1.4'),
  • # Host(hostname='c5',port='22',ip='1.1.1.5'),
  • # ])
  • # session.commit()
  • # session.add_all([
  • # HostUser(username='root'),
  • # HostUser(username='db'),
  • # HostUser(username='nb'),
  • # HostUser(username='sb'),
  • # ])
  • # session.commit()
  • # session.add_all([
  • # HostToHostUser(host_id=1,host_user_id=1),
  • # HostToHostUser(host_id=1,host_user_id=2),
  • # HostToHostUser(host_id=1,host_user_id=3),
  • # HostToHostUser(host_id=2,host_user_id=2),
  • # HostToHostUser(host_id=2,host_user_id=4),
  • # HostToHostUser(host_id=2,host_user_id=3),
  • # ])
  • # session.commit()
  • # 獲取主機1中所有用戶
  • host_obj = session.query(Host).filter(Host.hostname == 'c1').first()
  • # host_obj.nid
  • host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()
  • print(host_2_host_user)
  • r = zip(*host_2_host_user)
  • # print(list(r)[0])
  • users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
  • print(users)

  • 3、relationship
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • from sqlalchemy import create_engine
  • from sqlalchemy.ext.declarative import declarative_base
  • from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
  • from sqlalchemy.orm import sessionmaker, relationship
  • engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)
  • Base = declarative_base()
  • ##############多對多######################
  • class Host(Base):
  • __tablename__ = 'host'
  • nid = Column(Integer, primary_key=True,autoincrement=True)
  • hostname = Column(String(32))
  • port = Column(String(32))
  • ip = Column(String(32))
  • class HostUser(Base):
  • __tablename__ = 'host_user'
  • nid = Column(Integer, primary_key=True,autoincrement=True)
  • username = Column(String(32))
  • class HostToHostUser(Base):
  • __tablename__ = 'host_to_host_user'
  • nid = Column(Integer, primary_key=True,autoincrement=True)
  • host_id = Column(Integer,ForeignKey('host.nid'))
  • host_user_id = Column(Integer,ForeignKey('host_user.nid'))
  • host = relationship('Host',backref = 'h')
  • host_user = relationship('HostUser',backref = 'u')
  • def init_db():
  • Base.metadata.create_all(engine)
  • def drop_db():
  • Base.metadata.drop_all(engine)
  • # init_db() # 創建表
  • Session = sessionmaker(bind=engine)
  • session = Session()
  • # session.add_all([
  • # Host(hostname='c1',port='22',ip='1.1.1.1'),
  • # Host(hostname='c2',port='22',ip='1.1.1.2'),
  • # Host(hostname='c3',port='22',ip='1.1.1.3'),
  • # Host(hostname='c4',port='22',ip='1.1.1.4'),
  • # Host(hostname='c5',port='22',ip='1.1.1.5'),
  • # ])
  • #
  • #
  • # session.add_all([
  • # HostUser(username='root'),
  • # HostUser(username='db'),
  • # HostUser(username='nb'),
  • # HostUser(username='sb'),
  • # ])
  • #
  • # session.add_all([
  • # HostToHostUser(host_id=1,host_user_id=1),
  • # HostToHostUser(host_id=1,host_user_id=2),
  • # HostToHostUser(host_id=1,host_user_id=3),
  • # HostToHostUser(host_id=2,host_user_id=2),
  • # HostToHostUser(host_id=2,host_user_id=4),
  • # HostToHostUser(host_id=2,host_user_id=3),
  • # ])
  • # session.commit()
  • # 獲取主機1中所有用戶
  • # host_obj = session.query(Host).filter(Host.hostname == 'c1').first()
  • # host_obj.nid
  • # host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()
  • # print(host_2_host_user)
  • # r = zip(*host_2_host_user)
  • # print(list(r)[0])
  • # users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
  • # print(users)
  • host_obj = session.query(Host).filter(Host.hostname == 'c1').first()
  • for item in host_obj.h:
  • print(item.host_user.username)

  • 4、更簡單的方式A ? ?關系(B,AB Table對象)AB == > fkB操作時,簡單






    SQLAlchemy總結:1、創建表
    2、操作表單表操作
    連表操作.join關系:一對多fk,關系多對多多一張表,fk1、關系表:關系2、在某一張表:關系;A:關系,(B,AB)


    二、Paramiko模塊使用Paramiko模塊連接遠程服務器并執行命令:
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • # paramiko模塊
  • import paramiko
  • # 普通連接遠程主機并執行一條命令
  • # 創建SSH對象
  • ssh = paramiko.SSHClient()
  • # 允許連接不在know_hosts文件中的主機
  • ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  • # 連接服務器
  • ssh.connect(hostname='192.168.111.2', port=22, username='root', password='111111')
  • # 執行命令
  • stdin, stdout, stderr = ssh.exec_command('ls -l')
  • # 獲取命令結果
  • result = stdout.read()
  • print(result)
  • # 關閉連接
  • ssh.close()

  • 通過Paramiko模塊一次連接服務器實現執行命令,上傳文件等多次操作:
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • import paramiko
  • import uuid
  • # 實現一次鏈接執行命令、上傳文件、執行命令
  • class SSHConnection(object):
  • def __init__(self, host='192.168.111.2', port=22, username='root',pwd='111111'):
  • self.host = host
  • self.port = port
  • self.username = username
  • self.pwd = pwd
  • self.__k = None
  • def run(self):
  • self.connect()
  • pass
  • self.close()
  • def connect(self):
  • transport = paramiko.Transport((self.host,self.port))
  • transport.connect(username=self.username,password=self.pwd)
  • self.__transport = transport
  • def close(self):
  • self.__transport.close()
  • def cmd(self, command):
  • ssh = paramiko.SSHClient()
  • ssh._transport = self.__transport
  • # 執行命令
  • stdin, stdout, stderr = ssh.exec_command(command)
  • # 獲取命令結果
  • result = stdout.read()
  • return result
  • def upload(self,local_path, target_path):
  • # 連接,上傳
  • sftp = paramiko.SFTPClient.from_transport(self.__transport)
  • # 將location.py 上傳至服務器 /tmp/test.py
  • sftp.put(local_path, target_path)
  • ssh = SSHConnection()
  • ssh.connect()
  • r1 = ssh.cmd('df')
  • print(r1)
  • ssh.upload('s1.py', "/root/s7.py")
  • ssh.close()

  • 堡壘機,通過回車確定輸入內容,并返回結果(一次輸入一條命令返回結果,只可以在Linux上執行):
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • # 堡壘機,通過回車確定輸入內容,并返回結果
  • import paramiko
  • import sys
  • import os
  • import socket
  • import select
  • import getpass
  • from paramiko.py3compat import u
  • tran = paramiko.Transport(('192.168.111.2', 22,))
  • tran.start_client()
  • tran.auth_password('root', '111111')
  • # 打開一個通道
  • chan = tran.open_session()
  • # 獲取一個終端
  • chan.get_pty()
  • # 激活器
  • chan.invoke_shell()
  • while True:
  • # 監視用戶輸入和服務器返回數據
  • # sys.stdin 處理用戶輸入
  • # chan 是之前創建的通道,用于接收服務器返回信息
  • readable, writeable, error = select.select([chan, sys.stdin, ], [], [], 1)
  • if chan in readable:
  • try:
  • x = u(chan.recv(1024))
  • if len(x) == 0:
  • print('\r\n*** EOF\r\n')
  • break
  • sys.stdout.write(x)
  • sys.stdout.flush()
  • except socket.timeout:
  • pass
  • if sys.stdin in readable:
  • inp = sys.stdin.readline()
  • chan.sendall(inp)
  • chan.close()
  • tran.close()

  • 堡壘機,通過回車確定輸入內容,并返回結果(一次輸入一個字符,可以通過tab補全,只可以在Linux上執行):
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • import paramiko
  • import sys
  • import os
  • import socket
  • import select
  • import getpass
  • import termios
  • import tty
  • from paramiko.py3compat import u
  • tran = paramiko.Transport(('192.168.111.2', 22,))
  • tran.start_client()
  • tran.auth_password('root', '111111')
  • # 打開一個通道
  • chan = tran.open_session()
  • # 獲取一個終端
  • chan.get_pty()
  • # 激活器
  • chan.invoke_shell()
  • # 獲取原tty屬性
  • oldtty = termios.tcgetattr(sys.stdin)
  • try:
  • # 為tty設置新屬性
  • # 默認當前tty設備屬性:
  • # 輸入一行回車,執行
  • # CTRL+C 進程退出,遇到特殊字符,特殊處理。
  • # 這是為原始模式,不認識所有特殊符號
  • # 放置特殊字符應用在當前終端,如此設置,將所有的用戶輸入均發送到遠程服務器
  • tty.setraw(sys.stdin.fileno())
  • chan.settimeout(0.0)
  • while True:
  • # 監視 用戶輸入 和 遠程服務器返回數據(socket)
  • # 阻塞,直到句柄可讀
  • r, w, e = select.select([chan, sys.stdin], [], [], 1)
  • if chan in r:
  • try:
  • x = u(chan.recv(1024))
  • if len(x) == 0:
  • print('\r\n*** EOF\r\n')
  • break
  • sys.stdout.write(x)
  • sys.stdout.flush()
  • except socket.timeout:
  • pass
  • if sys.stdin in r:
  • x = sys.stdin.read(1)
  • if len(x) == 0:
  • break
  • chan.send(x)
  • finally:
  • # 重新設置終端屬性
  • termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
  • chan.close()
  • tran.close()

  • 堡壘機,通過回車確定輸入內容,并返回結果(終極版本,可以在windows和linux上執行):
  • #!/usr/bin/env python
  • # -.- coding:utf-8 -.-
  • # By Sandler
  • import paramiko
  • import sys
  • import os
  • import socket
  • import getpass
  • from paramiko.py3compat import u
  • # windows does not have termios...
  • try:
  • import termios
  • import tty
  • has_termios = True
  • except ImportError:
  • has_termios = False
  • def interactive_shell(chan):
  • if has_termios:
  • posix_shell(chan)
  • else:
  • windows_shell(chan)
  • def posix_shell(chan):
  • import select
  • oldtty = termios.tcgetattr(sys.stdin)
  • try:
  • tty.setraw(sys.stdin.fileno())
  • tty.setcbreak(sys.stdin.fileno())
  • chan.settimeout(0.0)
  • log = open('handle.log', 'a+', encoding='utf-8')
  • flag = False
  • temp_list = []
  • while True:
  • r, w, e = select.select([chan, sys.stdin], [], [])
  • if chan in r:
  • try:
  • x = u(chan.recv(1024))
  • if len(x) == 0:
  • sys.stdout.write('\r\n*** EOF\r\n')
  • break
  • if flag:
  • if x.startswith('\r\n'):
  • pass
  • else:
  • temp_list.append(x)
  • flag = False
  • sys.stdout.write(x)
  • sys.stdout.flush()
  • except socket.timeout:
  • pass
  • if sys.stdin in r:
  • x = sys.stdin.read(1)
  • import json
  • if len(x) == 0:
  • break
  • if x == '\t':
  • flag = True
  • else:
  • temp_list.append(x)
  • if x == '\r':
  • log.write(''.join(temp_list))
  • log.flush()
  • temp_list.clear()
  • chan.send(x)
  • finally:
  • termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
  • def windows_shell(chan):
  • import threading
  • sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
  • def writeall(sock):
  • while True:
  • data = sock.recv(256)
  • if not data:
  • sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
  • sys.stdout.flush()
  • break
  • sys.stdout.write(data)
  • sys.stdout.flush()
  • writer = threading.Thread(target=writeall, args=(chan,))
  • writer.start()
  • try:
  • while True:
  • d = sys.stdin.read(1)
  • if not d:
  • break
  • chan.send(d)
  • except EOFError:
  • # user hit ^Z or F6
  • pass
  • def run():
  • default_username = getpass.getuser()
  • username = input('Username [%s]: ' % default_username)
  • if len(username) == 0:
  • username = default_username
  • hostname = input('Hostname: ')
  • if len(hostname) == 0:
  • print('*** Hostname required.')
  • sys.exit(1)
  • tran = paramiko.Transport((hostname, 22,))
  • tran.start_client()
  • default_auth = "p"
  • auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth)
  • if len(auth) == 0:
  • auth = default_auth
  • if auth == 'r':
  • default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
  • path = input('RSA key [%s]: ' % default_path)
  • if len(path) == 0:
  • path = default_path
  • try:
  • key = paramiko.RSAKey.from_private_key_file(path)
  • except paramiko.PasswordRequiredException:
  • password = getpass.getpass('RSA key password: ')
  • key = paramiko.RSAKey.from_private_key_file(path, password)
  • tran.auth_publickey(username, key)
  • else:
  • pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
  • tran.auth_password(username, pw)
  • # 打開一個通道
  • chan = tran.open_session()
  • # 獲取一個終端
  • chan.get_pty()
  • # 激活器
  • chan.invoke_shell()
  • interactive_shell(chan)
  • chan.close()
  • tran.close()
  • if __name__ == '__main__':
  • run()



  • 鏈接:堡壘機














    來自為知筆記(Wiz)

    轉載于:https://www.cnblogs.com/sandler613/p/5744659.html

    與50位技術專家面對面20年技術見證,附贈技術全景圖

    總結

    以上是生活随笔為你收集整理的Python之路_Day13的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。