Python之路_Day13
生活随笔
收集整理的這篇文章主要介紹了
Python之路_Day13
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Python之路_Day13_課堂筆記
前期回顧一、redis發布訂閱
二、rabbitMQ原始隊列exchangeex全部轉發ex,關鍵字ex,模糊匹配rpc
三、MySQL
四、Python MySQLpymysqlexcute ? ?執行單條語句,返回受影響的行數excutemany ? ?執行多條語句,返回受影響的行數fetchonefetchallfetchmanyscrolllastrowid
五、SQLAlchemyORM框架db firstcode first====> 我們以后通過類和對象操作數據庫code first1、自定義生成表2、使用類操作表
本節摘要一、ORM
連表一對多多對多二、Paramiko模塊鏈接:堡壘機三、前端HTML
http://www.cnblogs.com/wupeiqi/articles/5699254.html
一、ORM—SQLAlchemy
連表一對多1、創建表,主動知道外鍵2、操作:類:repr單表連表session.query(表1).join(表2).all()#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) Base = declarative_base() class Test(Base): __tablename__ = 'test' nid = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(32)) class Group(Base): __tablename__ = 'group' nid = Column(Integer,primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__ = 'user' nid = Column(Integer,primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer,ForeignKey('group.nid')) def __repr__(self): temp = "%s - %s : %s" %(self.nid,self.username,self.group_id) return temp def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) # init_db() Session = sessionmaker(bind=engine) session = Session() # session.add(Group(caption='dba')) # session.add(Group(caption='ddd')) # session.commit() # session.add_all([ # User(username='alex1',group_id=1), # User(username='alex2',group_id=2) # ]) # session.commit() # 只是獲取用戶 # ret = session.query(User).filter(User.username == 'alex1').all() # print(ret) # ret = session.query(User).all() # obj = ret[0] # print(ret) # print(obj) # print(obj.nid) # print(obj.username) # print(obj.group_id) # ret = session.query(User.username).all() # print(ret) sql = session.query(User,Group).join(Group, isouter=True) print(sql) ret = session.query(User,Group).join(Group, isouter=True).all() print(ret) # select * from user left join group on user.group_id = group.nid 正反向查找#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) Base = declarative_base() class Test(Base): __tablename__ = 'test' nid = Column(Integer,primary_key=True,autoincrement=True) name = Column(String(32)) class Group(Base): __tablename__ = 'group' nid = Column(Integer,primary_key=True,autoincrement=True) caption = Column(String(32)) class User(Base): __tablename__ = 'user' nid = Column(Integer,primary_key=True,autoincrement=True) username = Column(String(32)) group_id = Column(Integer,ForeignKey('group.nid')) group = relationship('Group',backref='uuu') def __repr__(self): temp = "%s - %s : %s" %(self.nid,self.username,self.group_id) return temp def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) # init_db() Session = sessionmaker(bind=engine) session = Session() # session.add(Group(caption='dba')) # session.add(Group(caption='ddd')) # session.commit() # session.add_all([ # User(username='alex1',group_id=1), # User(username='alex2',group_id=2) # ]) # session.commit() # 只是獲取用戶 # ret = session.query(User).filter(User.username == 'alex1').all() # print(ret) # ret = session.query(User).all() # obj = ret[0] # print(ret) # print(obj) # print(obj.nid) # print(obj.username) # print(obj.group_id) # ret = session.query(User.username).all() # print(ret) # sql = session.query(User,Group).join(Group, isouter=True) # print(sql) # ret = session.query(User,Group).join(Group, isouter=True).all() # print(ret) # select * from user left join group on user.group_id = group.nid # 原始方式 # ret = session.query(User.username,Group.caption).join(Group, isouter=True).all() # 新方式(正向查詢) # ret = session.query(User).all() # for obj in ret: # obj代指user表的每一行數據 # obj.group代指group對象 # print(obj.nid,obj.username,obj.group_id,obj.group,obj.group.nid,obj.group.caption) # 原始方式 # ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == 'DBA').all() # 新方式(反向查詢) # obj = session.query(Group).filter(Group.caption == 'DBA').first() # print(obj.nid) # print(obj.caption) # print(obj.uuu)
多對多:1、創建表——額外的關系表2、filter()==in_(都可以是另外一個查詢)#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) Base = declarative_base() ##############多對多###################### class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) class HostToHostUser(Base): __tablename__ = 'host_to_host_user' nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey('host.nid')) host_user_id = Column(Integer,ForeignKey('host_user.nid')) def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) # init_db() # 創建表 Session = sessionmaker(bind=engine) session = Session() # session.add_all([ # Host(hostname='c1',port='22',ip='1.1.1.1'), # Host(hostname='c2',port='22',ip='1.1.1.2'), # Host(hostname='c3',port='22',ip='1.1.1.3'), # Host(hostname='c4',port='22',ip='1.1.1.4'), # Host(hostname='c5',port='22',ip='1.1.1.5'), # ]) # session.commit() # session.add_all([ # HostUser(username='root'), # HostUser(username='db'), # HostUser(username='nb'), # HostUser(username='sb'), # ]) # session.commit() # session.add_all([ # HostToHostUser(host_id=1,host_user_id=1), # HostToHostUser(host_id=1,host_user_id=2), # HostToHostUser(host_id=1,host_user_id=3), # HostToHostUser(host_id=2,host_user_id=2), # HostToHostUser(host_id=2,host_user_id=4), # HostToHostUser(host_id=2,host_user_id=3), # ]) # session.commit() # 獲取主機1中所有用戶 host_obj = session.query(Host).filter(Host.hostname == 'c1').first() # host_obj.nid host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all() print(host_2_host_user) r = zip(*host_2_host_user) # print(list(r)[0]) users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all() print(users)
3、relationship#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) Base = declarative_base() ##############多對多###################### class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) class HostToHostUser(Base): __tablename__ = 'host_to_host_user' nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey('host.nid')) host_user_id = Column(Integer,ForeignKey('host_user.nid')) host = relationship('Host',backref = 'h') host_user = relationship('HostUser',backref = 'u') def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) # init_db() # 創建表 Session = sessionmaker(bind=engine) session = Session() # session.add_all([ # Host(hostname='c1',port='22',ip='1.1.1.1'), # Host(hostname='c2',port='22',ip='1.1.1.2'), # Host(hostname='c3',port='22',ip='1.1.1.3'), # Host(hostname='c4',port='22',ip='1.1.1.4'), # Host(hostname='c5',port='22',ip='1.1.1.5'), # ]) # # # session.add_all([ # HostUser(username='root'), # HostUser(username='db'), # HostUser(username='nb'), # HostUser(username='sb'), # ]) # # session.add_all([ # HostToHostUser(host_id=1,host_user_id=1), # HostToHostUser(host_id=1,host_user_id=2), # HostToHostUser(host_id=1,host_user_id=3), # HostToHostUser(host_id=2,host_user_id=2), # HostToHostUser(host_id=2,host_user_id=4), # HostToHostUser(host_id=2,host_user_id=3), # ]) # session.commit() # 獲取主機1中所有用戶 # host_obj = session.query(Host).filter(Host.hostname == 'c1').first() # host_obj.nid # host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all() # print(host_2_host_user) # r = zip(*host_2_host_user) # print(list(r)[0]) # users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all() # print(users) host_obj = session.query(Host).filter(Host.hostname == 'c1').first() for item in host_obj.h: print(item.host_user.username)
4、更簡單的方式A ? ?關系(B,AB Table對象)AB == > fkB操作時,簡單
SQLAlchemy總結:1、創建表
2、操作表單表操作
連表操作.join關系:一對多fk,關系多對多多一張表,fk1、關系表:關系2、在某一張表:關系;A:關系,(B,AB)
二、Paramiko模塊使用Paramiko模塊連接遠程服務器并執行命令:#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler # paramiko模塊 import paramiko # 普通連接遠程主機并執行一條命令 # 創建SSH對象 ssh = paramiko.SSHClient() # 允許連接不在know_hosts文件中的主機 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 連接服務器 ssh.connect(hostname='192.168.111.2', port=22, username='root', password='111111') # 執行命令 stdin, stdout, stderr = ssh.exec_command('ls -l') # 獲取命令結果 result = stdout.read() print(result) # 關閉連接 ssh.close()
通過Paramiko模塊一次連接服務器實現執行命令,上傳文件等多次操作:#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler import paramiko import uuid # 實現一次鏈接執行命令、上傳文件、執行命令 class SSHConnection(object): def __init__(self, host='192.168.111.2', port=22, username='root',pwd='111111'): self.host = host self.port = port self.username = username self.pwd = pwd self.__k = None def run(self): self.connect() pass self.close() def connect(self): transport = paramiko.Transport((self.host,self.port)) transport.connect(username=self.username,password=self.pwd) self.__transport = transport def close(self): self.__transport.close() def cmd(self, command): ssh = paramiko.SSHClient() ssh._transport = self.__transport # 執行命令 stdin, stdout, stderr = ssh.exec_command(command) # 獲取命令結果 result = stdout.read() return result def upload(self,local_path, target_path): # 連接,上傳 sftp = paramiko.SFTPClient.from_transport(self.__transport) # 將location.py 上傳至服務器 /tmp/test.py sftp.put(local_path, target_path) ssh = SSHConnection() ssh.connect() r1 = ssh.cmd('df') print(r1) ssh.upload('s1.py', "/root/s7.py") ssh.close()
堡壘機,通過回車確定輸入內容,并返回結果(一次輸入一條命令返回結果,只可以在Linux上執行):
#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler # 堡壘機,通過回車確定輸入內容,并返回結果 import paramiko import sys import os import socket import select import getpass from paramiko.py3compat import u tran = paramiko.Transport(('192.168.111.2', 22,)) tran.start_client() tran.auth_password('root', '111111') # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() while True: # 監視用戶輸入和服務器返回數據 # sys.stdin 處理用戶輸入 # chan 是之前創建的通道,用于接收服務器返回信息 readable, writeable, error = select.select([chan, sys.stdin, ], [], [], 1) if chan in readable: try: x = u(chan.recv(1024)) if len(x) == 0: print('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in readable: inp = sys.stdin.readline() chan.sendall(inp) chan.close() tran.close()
堡壘機,通過回車確定輸入內容,并返回結果(一次輸入一個字符,可以通過tab補全,只可以在Linux上執行):
#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler import paramiko import sys import os import socket import select import getpass import termios import tty from paramiko.py3compat import u tran = paramiko.Transport(('192.168.111.2', 22,)) tran.start_client() tran.auth_password('root', '111111') # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() # 獲取原tty屬性 oldtty = termios.tcgetattr(sys.stdin) try: # 為tty設置新屬性 # 默認當前tty設備屬性: # 輸入一行回車,執行 # CTRL+C 進程退出,遇到特殊字符,特殊處理。 # 這是為原始模式,不認識所有特殊符號 # 放置特殊字符應用在當前終端,如此設置,將所有的用戶輸入均發送到遠程服務器 tty.setraw(sys.stdin.fileno()) chan.settimeout(0.0) while True: # 監視 用戶輸入 和 遠程服務器返回數據(socket) # 阻塞,直到句柄可讀 r, w, e = select.select([chan, sys.stdin], [], [], 1) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: print('\r\n*** EOF\r\n') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) if len(x) == 0: break chan.send(x) finally: # 重新設置終端屬性 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) chan.close() tran.close()
堡壘機,通過回車確定輸入內容,并返回結果(終極版本,可以在windows和linux上執行):
#!/usr/bin/env python # -.- coding:utf-8 -.- # By Sandler import paramiko import sys import os import socket import getpass from paramiko.py3compat import u # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan): if has_termios: posix_shell(chan) else: windows_shell(chan) def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) log = open('handle.log', 'a+', encoding='utf-8') flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write('\r\n*** EOF\r\n') break if flag: if x.startswith('\r\n'): pass else: temp_list.append(x) flag = False sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) import json if len(x) == 0: break if x == '\t': flag = True else: temp_list.append(x) if x == '\r': log.write(''.join(temp_list)) log.flush() temp_list.clear() chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write('\r\n*** EOF ***\r\n\r\n') sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass def run(): default_username = getpass.getuser() username = input('Username [%s]: ' % default_username) if len(username) == 0: username = default_username hostname = input('Hostname: ') if len(hostname) == 0: print('*** Hostname required.') sys.exit(1) tran = paramiko.Transport((hostname, 22,)) tran.start_client() default_auth = "p" auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth) if len(auth) == 0: auth = default_auth if auth == 'r': default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa') path = input('RSA key [%s]: ' % default_path) if len(path) == 0: path = default_path try: key = paramiko.RSAKey.from_private_key_file(path) except paramiko.PasswordRequiredException: password = getpass.getpass('RSA key password: ') key = paramiko.RSAKey.from_private_key_file(path, password) tran.auth_publickey(username, key) else: pw = getpass.getpass('Password for %s@%s: ' % (username, hostname)) tran.auth_password(username, pw) # 打開一個通道 chan = tran.open_session() # 獲取一個終端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == '__main__': run()
鏈接:堡壘機
來自為知筆記(Wiz)
前期回顧一、redis發布訂閱
二、rabbitMQ原始隊列exchangeex全部轉發ex,關鍵字ex,模糊匹配rpc
三、MySQL
四、Python MySQLpymysqlexcute ? ?執行單條語句,返回受影響的行數excutemany ? ?執行多條語句,返回受影響的行數fetchonefetchallfetchmanyscrolllastrowid
五、SQLAlchemyORM框架db firstcode first====> 我們以后通過類和對象操作數據庫code first1、自定義生成表2、使用類操作表
本節摘要一、ORM
連表一對多多對多二、Paramiko模塊鏈接:堡壘機三、前端HTML
http://www.cnblogs.com/wupeiqi/articles/5699254.html
一、ORM—SQLAlchemy
連表一對多1、創建表,主動知道外鍵2、操作:類:repr單表連表session.query(表1).join(表2).all()
多對多:1、創建表——額外的關系表2、filter()==in_(都可以是另外一個查詢)
3、relationship
4、更簡單的方式A ? ?關系(B,AB Table對象)AB == > fkB操作時,簡單
SQLAlchemy總結:1、創建表
2、操作表單表操作
連表操作.join關系:一對多fk,關系多對多多一張表,fk1、關系表:關系2、在某一張表:關系;A:關系,(B,AB)
二、Paramiko模塊使用Paramiko模塊連接遠程服務器并執行命令:
通過Paramiko模塊一次連接服務器實現執行命令,上傳文件等多次操作:
堡壘機,通過回車確定輸入內容,并返回結果(一次輸入一條命令返回結果,只可以在Linux上執行):
堡壘機,通過回車確定輸入內容,并返回結果(一次輸入一個字符,可以通過tab補全,只可以在Linux上執行):
堡壘機,通過回車確定輸入內容,并返回結果(終極版本,可以在windows和linux上執行):
鏈接:堡壘機
來自為知筆記(Wiz)
轉載于:https://www.cnblogs.com/sandler613/p/5744659.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Python之路_Day13的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网页分享URL
- 下一篇: websocket python爬虫_p