5位随机验证码
# -*- coding:utf-8 -*-
import random
import string
print(string.ascii_letters) #abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
print(string.ascii_lowercase) #abcdefghijklmnopqrstuvwxyz
print(string.ascii_uppercase) #ABCDEFGHIJKLMNOPQRSTUVWXYZ
print(string.digits) #0123456789
print(string.hexdigits) #0123456789abcdefABCDEF
print(string.octdigits) #01234567
s = random.sample(string.ascii_letters+string.digits,5)
print(s) #['i', '3', 'E', 'R', 'v']
print(''.join(s)) #i3ERv
终端打印出带颜色的提示
def print_correct(msg):
print('\033[32;1m %s \033[0m' % msg)
def print_error(msg):
print('\033[31;1m %s \033[0m' % msg)
def exit_msg(msg):
exit('\033[31;1m %s \033[0m' % msg)
单例模式
'''绑定到类的方法实现'''
import threading
class Mysql:
__instance_lock = threading.Lock()
__isinstance = None
def __init__(self):
# 每个实例化对象的ip与port都是默认相等的
self.ip = '127.0.0.1'
self.port = 8000
# 由于这个我们操作的内容只跟类有关
# 因此这里用绑定到类的方法
@classmethod
def create(cls):
# 如果静态变量 __isinstance是None的话
if not cls.__isinstance:
with cls.__instance_lock:
obj = cls()
cls.__isinstance = obj
# 最后将静态变量__isinstance return出去!
# 如果以后用create创建相同特性的对象的话
# 就可以用“一个空间”了,大大节省了空间!
return cls.__isinstance
if __name__ == '__main__':
# 用绑定到类的方法create“创建”两个对象
m1 = Mysql.create()
m2 = Mysql.create()
# 比较小两个对象的id
print(id(m1),id(m2))#55308176 55308176
print(m1 is m2)#True
'''重写__new__方法'''
# coding:utf-8
import threading
class Student(object):
__instance_lock = threading.Lock()
__instance = False
def __init__(self,name=None,age=None):
if name:
self.name = name
if age:
self.age = age
def __new__(cls, *args, **kwargs):
if not cls.__instance:
with cls.__instance_lock:
obj = super().__new__(cls)
cls.__instance = obj
return cls.__instance
if __name__ == '__main__':
s1 = Student('whw1',22)
s2 = Student('whw2',24)
s3 = Student('whw3',25)
print(s1,s2,s3)
# <__main__.Student object at 0x1009c6080> <__main__.Student object at 0x1009c6080> <__main__.Student object at 0x1009c6080>
print(s1.name,s2.age)# whw3 25 是最后一个实例化的对象
'''装饰器实现'''
import threading
def singleton(cls):
instances = {}
def _singleton(*args,**kwargs):
if cls not in instances:
instances[cls] = cls(*args,**kwargs)
return instances[cls]
return _singleton
@singleton
class MySQL:
def __init__(self):
self.host = '127.0.0.1'
self.port = 3306
if __name__ == '__main__':
m1 = MySQL()
m2 = MySQL()
m3 = MySQL()
print(m1)
print(m2)
print(m3)
"""
<__main__.MySQL object at 0x10357cef0>
<__main__.MySQL object at 0x10357cef0>
<__main__.MySQL object at 0x10357cef0>
"""
基于模块导入的单例模式
生成器读取大文件的例子
有一个文件 大概有500G,并且只有一行,行之间有分隔符,我们需要把文件内的数据一行一行的读取出来,然后写入数据库里面。
#500G:一行,中间用{|}分开
def myreadlines(f, newline):
buf = ""
while True:
while newline in buf:
pos = buf.index(newline)
yield buf[:pos]
buf = buf[pos + len(newline):]
chunk = f.read(4096)
if not chunk:
#说明已经读到了文件结尾
yield buf
break
buf += chunk
if __name__ == '__main__':
with open("input.txt") as f:
for line in myreadlines(f, "{|}"):
print (line)
生成大文件的秘文
# -*- coding:utf-8 -*-
import os
import hashlib
def md5_pwd(path):
file_size = os.path.getsize(path)
md5 = hashlib.md5()
with open(path,'rb')as f:
while file_size >= 4096:
content = f.read(4096)
md5.update(content)
file_size -= 4096
else:
content = f.read()
if content:
md5.update(content)
return md5.hexdigest()
if __name__ == '__main__':
print(md5_pwd('D:\\0.0电影\暴雪将至.mp4'))
#6a37f182c3501c4d328760818439efa5
堆栈法计算文件夹大小
import os
BASE_DIR = os.path.dirname(os.path.abspath(__name__))
FILE_PATH = os.path.join(BASE_DIR,'whw2')
lis = [FILE_PATH]
count = 0
while lis:
path = lis.pop()
for f in os.listdir(path):
sub_path = os.path.join(path,f)
if os.path.isdir(sub_path):
lis.append(sub_path)
elif os.path.isfile(sub_path):
count += os.path.getsize(sub_path)
print(count)
堆栈法清空文件夹
# -*- coding:utf-8 -*-
import os
def del_dir(filepath):
lis = [filepath]
lst = []
while lis:
path = lis.pop()
for f in os.listdir(path):
sub_path = os.path.join(path,f)
if os.path.isfile(sub_path):
os.remove(sub_path)
elif os.path.isdir(sub_path):
lis.append(sub_path)
lst.append(sub_path)
# 把所有文件删完后,再处理里面的空文件夹
# 注意这里 倒序 取!从里往外删
for f in lst[::-1]:
os.rmdir(f)
# 最后再把最外面的目录删掉 —— 整个目录就没得了
os.rmdir(filepath)
分割存放连续时间字符串的列表
itertools.chain实现同时遍历多个可迭代对象
""" 一、调用itertools的chain方法实现"""
from itertools import chain
my_list = [1,2,3]
my_dict = {
"html1":"http://projectsedu.com",
"html2":"http://www.imooc.com",
}
for value in chain(my_list,my_dict,range(1,5)):
print(value)
"""
1
2
3
html1
html2
1
2
3
4
"""
""" 二、使用yield实现该功能"""
my_list = [1,2,3]
my_dict = {
"html1":"http://projectsedu.com",
"html2":"http://www.imooc.com",
}
# 自己是心啊一个chain效果的函数
def my_chain(*args, **kwargs):
for my_iterable in args:
for value in my_iterable:
yield value
for value in my_chain(my_list, my_dict, range(5,7)):
print(value)
""" 三、使用yield from实现该功能 """
my_list = [1,2,3]
my_dict = {
"html1":"http://projectsedu.com",
"html2":"http://www.imooc.com",
}
def my_chain(*args, **kwargs):
for my_iterable in args:
yield from my_iterable
for value in my_chain(my_list, my_dict, range(5,7)):
print(value)
自己实现一个计算器
# -*- coding:utf-8 -*-
import re
def mul_div(exp):
"""
计算两个数的乘除法
:return:返回str类型的结果,因为后面要用这个结果replace掉之前表达式
"""
if '*' in exp:
a,b = exp.split('*')
return str(float(a) * float(b))
else:
a,b = exp.split('/')
return str(float(a) / float(b))
def format_exp(exp):
"""
整理加减运算的格式
"""
exp = exp.replace('--','+')
exp = exp.replace('-+','-')
exp = exp.replace('++','+')
exp = exp.replace('+-','-')
return exp
def cal_no_bracket(no_bracket_exp):
"""
计算内部不含括号的表达式的值
:param no_bracket_exp: 字符串类型的内部没有小括号的表达式:'(9-2*5/3+7/3*99/4*2998+10*568/14)'
:return: 返回float类型的值
"""
# 先计算乘除法——两个数两个数的计算
while 1:
# 含有乘除符号的表达式
# 注意这里用search找!只找*/符号两边的,利用循环,一次次的去找
# 注意,乘除符号后面可能有-号,这种情况也要考虑进去
mul_div_exp = re.search('\d+(\.\d+)?[*/]-?\d+(\.\d+)?', no_bracket_exp)
# 找到表示有两个数的乘除法
if mul_div_exp:
# 注意用group取值
md_exp = mul_div_exp.group()
# 计算简单的两个数的乘除法
res = mul_div(md_exp)
# 计算出来后将之前的表达式换成计算成的值
no_bracket_exp = no_bracket_exp.replace(md_exp,res)
# 没有表示没有乘除法运算了,直接跳出循环计算加减法
else:
break
# 这时只有加减法了,先把表达式的符号管理一下
no_bracket_exp = format_exp(no_bracket_exp)
# 接着计算加减法,用findall找到所有的正负数,记得取消分组优先
res_list = re.findall('[+\-]?\d+(?:\.\d+)?',no_bracket_exp)
# 进行累加
sum = 0
for i in res_list:
sum += float(i)
return sum
def remove_bracket(exp):
"""
将表达式中所有的括号都计算出结果,并替换原有的表达式,直到这个表达式中不含有括号
:param exp:str类型的带有括号的表达式:'1-2*((60-30+(9-2*5/3+7/3*99/4*2998+10*568/14)*(-40/5))-(-4*3)/(16-3*2))'
:return:str类型不带括号的表达式:'1-2*-1388335.8476190479'
"""
while 1:
no_bracket = re.search('\([^()]+\)',exp)
if no_bracket:
# 记的从结果集中用group方法取到结果
no_bracket_exp = no_bracket.group()
# 计算括号内不含括号的表达式的结果
res = cal_no_bracket(no_bracket_exp)
## 最后把之前括号内的表达式替换成最终的结果
# 记得 res是float类型的,需要转成str类型进行操作
exp = exp.replace(no_bracket_exp,str(res))
# 没有表示表达式中没括号了
else:
break
# 最终将没有括号的表达式return
return exp
def calculator(origin_exp):
"""
处理原始的表达式,先去掉表达式中的空格,然后去掉括号并且进行最后一步的计算
:param origin_exp: 原始的表达式
:return: float类型的结果
"""
# 先处理掉原始表达式中的空格
exp = origin_exp.replace(' ','')
# 计算呆空格的结果
res = remove_bracket(exp)
# 然后计算没有空格的表达式的结果
ret = cal_no_bracket(res)
# 返回 float类型的结果
return ret
if __name__ == '__main__':
exp = '1 - 2 * ( (60-30 + (9-2*5/3 + 7 /3*99/4*2998 +10 * 568/14 )*(-40/5)) - (-4*3)/ (16-3*2) )'
print(calculator(exp))
print(eval(exp))
"""
2776672.6952380957
2776672.6952380957
"""
Python多属性排序
lst = [1, -2, 10, -12, -4, -5, 9, 2]
# 将列表进行排序,正数在前,负数在后,并且分别按绝对值从小到大
lst.sort(key=lambda x:(x<0,abs(x)))
print(lst) # [1, 2, 9, 10, -2, -4, -5, -12]
python3中的typehint
例1
# -*- coding:utf-8 -*-
def int_hint(a:int,b:int)->int:
return a + b
print(int_hint(11,22))
print(int_hint('aa','bb'))
class People(object):
def __init__(self,name):
self.name = name
def __str__(self):
return self.name
class Dog(object):
def __init__(self,name):
self.name = name
def __str__(self):
return self.name
def sing(name:People)->None:
print('{} is singing...'.format(name))
def bark(name:Dog)->None:
print('{} is barking...'.format(name))
Tom = People('Tom')
Jerry = Dog('Jerry')
sing(Tom)
sing(Jerry)
bark(Jerry)
"""
33
aabb
Tom is singing...
Jerry is singing...
Jerry is barking...
"""
例2
# -*- coding:utf-8 -*-
import typing
def create_dic(name:str,age:int)->dict:
return {
'name':name,
'age':age
}
def my_callable(name:str,age:int,callback:typing.Callable[[str,int],dict]):
ret = callback(name,age)
return ret
# 将create_dict函数传进去,注意是在my_callable函数中执行
print(my_callable('wanghw',18,create_dic)) # {'name': 'wanghw', 'age': 18}
堆栈的几个练习
文件修改的两种方式
遍历文件夹下的所有文件
# -*- coding:utf-8 -*-
import os
for item in os.listdir('/Users/wanghongwei/PycharmProjects/deploy27/deploy'):
print(item)
for item in os.walk('/Users/wanghongwei/PycharmProjects/deploy27/deploy'):
print(item)
重命名与删除
# -*- coding:utf-8 -*-
import shutil
shutil.move('/Users/wanghongwei/PycharmProjects/deploy27/deploy1','/Users/wanghongwei/PycharmProjects/deploy27/deploy')
shutil.rmtree('/Users/wanghongwei/PycharmProjects/deploy27/t')
杀进程
# -*- coding:utf-8 -*-
import os
import signal
import subprocess
import commands
output = subprocess.check_output("pgrep -f python", shell=True)
pid_list = map(int, output.split())
for pid in pid_list:
os.kill(pid, signal.SIGKILL)
salt推送文件
# -*- coding:utf-8 -*-
"""
SaltAPI推送文件
"""
'''#### 基于SSH:API ####'''
from salt.client.ssh.client import SSHClient
client = SSHClient()
# 执行命令
result = client.cmd('*', 'cmd.run', ('ls',))
# 调用grains
ret = client.cmd('*','grains.items')
# 调用pillar
ret = client.cmd('*','pillar.items')
# 执行 state
ret = client.cmd('*','state.sls',('fengfeng','pillar={"xxxx":"luffy"}'))
# 发送文件
ret = client.cmd('*','cp.get_file',('salt://fengfeng/files/test.conf','/data/s1.conf'))
# 发送文件
ret = client.cmd('*','cp.get_url',('http://www.pythonav.com/allstatic/imgs/mv/picture/2.jpeg','/data/s1.jpeg'))
'''#### 基于Master:API ####'''
import salt.client
local = salt.client.LocalClient()
# 执行命令
result = client.cmd('*', 'cmd.run', ('ls',))
# 调用grains
ret = client.cmd('*','grains.items')
# 调用pillar
ret = client.cmd('*','pillar.items')
# 执行 state
ret = client.cmd('*','state.sls',('fengfeng','pillar={"xxxx":"luffy"}'))
# 发送文件
ret = client.cmd('*','cp.get_file',('salt://fengfeng/files/test.conf','/data/s1.conf'))
# 发送文件
ret = client.cmd('*','cp.get_url',('http://www.pythonav.com/allstatic/imgs/mv/picture/2.jpeg','/data/s1.jpeg'))
抖音
# -*- coding:utf-8 -*-
import re
import requests
url = 'https://www.douyin.com/share/user/6556303280/?share_type=link' # 75097696932/96488770253/6556303280/67561351000
# ###################### 1. 根据url获取用户ID #########################
user_id = re.findall('share/user/(.*)/\?', url)[0]
# ###################### 2. 根据用户ID获取签名 #########################
# 方式一:
import os
output = os.popen('node s1.js %s' % user_id)
# output = os.popen('node byted-acrawler.js %s' % user_id)
signature = output.readlines()[0].strip()
# 方式二:
"""
def exec_javascript(file_path, func, *args):
pip3 install execjs
import execjs
with open(file_path, 'r', encoding='UTF-8') as f:
content = f.read()
ctx = execjs.compile(content)
return ctx.call(func, *args)
signature = exec_javascript("encode.js", '_bytedAcrawler.sign', user_id)
"""
# ###################### 3. 获取抖音用户发送过的所有的视频 #########################
headers = {
'accept': 'application/json',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'upgrade-insecure-requests': '1',
'host':'www.douyin.com',
'X-Requested-With':'XMLHttpRequest',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
}
user_video_list = []
user_video_params = {
'user_id': str(user_id),
'count': '21',
'max_cursor': '0',
'aid': '1128',
'_signature': signature,
'dytk': '114f1984d1917343ccfb14d94e7ce5f5'
}
def get_aweme_list(max_cursor=None):
if max_cursor:
user_video_params['max_cursor'] = str(max_cursor)
res = requests.get(
url="https://www.douyin.com/aweme/v1/aweme/post/",
params=user_video_params,
headers=headers
)
content_json = res.json()
print(content_json)
aweme_list = content_json.get('aweme_list', [])
user_video_list.extend(aweme_list)
if content_json.get('has_more') == 1:
return get_aweme_list(content_json.get('max_cursor'))
get_aweme_list()
print("该用户发布过抖音", user_video_list)
# ###################### 4. 获取抖音赞过的所有视频 #########################
#
# favor_video_list = []
#
# favor_video_params = {
# 'user_id': str(user_id),
# 'count': '21',
# 'max_cursor': '0',
# 'aid': '1128',
# '_signature': signature
# }
#
#
# def get_favor_list(max_cursor=None):
# if max_cursor:
# favor_video_params['max_cursor'] = str(max_cursor)
# res = requests.get(
# url="https://www.douyin.com/aweme/v1/aweme/favorite/",
# params=favor_video_params,
# headers=headers
# )
# content_json = res.json()
# aweme_list = content_json.get('aweme_list', [])
# favor_video_list.extend(aweme_list)
# if content_json.get('has_more') == 1:
# return get_favor_list(content_json.get('max_cursor'))
#
#
# get_favor_list()
#
# print("该用户赞过抖音", favor_video_list)
# ###################### 5. 下载抖音 #########################
"""
base_download_folder = os.path.join('download', user_id)
if not os.path.isdir(base_download_folder):
os.mkdir(base_download_folder)
# 下载自己的视频
user_download_folder = os.path.join(base_download_folder, 'user')
if not os.path.isdir(user_download_folder):
os.mkdir(user_download_folder)
for aweme in user_video_list:
if aweme.get('video', None):
video_id = aweme['video']['play_addr']['uri']
file_name = video_id + ".mp4"
file_path = os.path.join(user_download_folder, file_name)
response_video = requests.get(
url='https://aweme.snssdk.com/aweme/v1/play/',
params={
'video_id': video_id,
},
stream=True,
)
with open(file_path, 'wb') as fh:
for chunk in response_video.iter_content(chunk_size=1024):
fh.write(chunk)
# 下载赞过的视频
"""
"""
favor_download_folder = os.path.join(base_download_folder, 'favor')
if not os.path.isdir(favor_download_folder):
os.mkdir(favor_download_folder)
for aweme in favor_video_list:
if aweme.get('video', None):
video_id = aweme['video']['play_addr']['uri']
file_name = video_id + ".mp4"
file_path = os.path.join(favor_download_folder, file_name)
response_video = requests.get(
url='https://aweme.snssdk.com/aweme/v1/play/',
params={
'video_id': video_id,
},
stream=True,
)
with open(file_path, 'wb') as fh:
for chunk in response_video.iter_content(chunk_size=1024):
fh.write(chunk)
"""
iter方法读取文件
用迭代器去读取文件中的内容,防止文件太大一次性读取的话占用过大内存!
def iter_file(path, size=1024):
with open(path, "rb", ) as f:
for data in iter(lambda: f.read(size), b''):
yield data
for row in iter_file("manage.py"):
print(row)
开启多线程-爬取科斯林词典
import re
import random
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
# ------------------------- 制作英文词典 --------------------------------------
rex = re.compile(r'[-&()/\.]+')
def bar(url):
response = requests.get(url=url)
soup = BeautifulSoup(response.text, 'html.parser')
ul_obj = soup.find(name='ul', attrs={'class', 'columns2 browse-list'})
return ul_obj.find_all(name='a')
def worker(url):
"""
拿到具体的连接,https://www.collinsdictionary.com/browse/english/words-starting-with-a
如上链接,是所有以a开头的单词集合
"""
a_list = bar(url='https://www.collinsdictionary.com/browse/english/words-starting-with-{}'.format(url[0]))
for item in a_list:
for i in bar(item.get('href')):
res = i.text
if not re.findall(rex, res) and len(res) > 2:
print(res)
url[1].write('{}\n'.format(res))
def spider_collins():
"""
爬取柯林斯网站所有的单词,链接深度共三层,
第一层获取24个字母的连接,
第二层获取以字母开头的所有短语或单词,
第三层,就是具体的一个个单词了
"""
f = open('w.txt', 'a', encoding='utf8')
t = ThreadPoolExecutor(cpu_count() * 5)
for i in range(ord('a'), ord('z') + 1): # 97 ~ 122
t.submit(worker, (chr(i), f))
# break
t.shutdown()
f.close()
递归嵌套列表的练习
递归实现列表的嵌套查找
# 将列表lis中的"tt"变成⼤写
def recursion1(lis):
for i, v in enumerate(lis):
#这里的i是列表元素的下标,v为列表中的元素
#如果列表的元素为tt,直接转大写
if v == 'tt':
lis[i] = v.upper()
#如果遍历到的的元素是个列表
elif isinstance(v, list):
#递归操作
recursion1(v)
#将列表中的数字3变成字符串"100"
def recursion2(lis):
for i,v in enumerate(lis):
if v == 3:
lis[i] = '100'
elif isinstance(v,list):
recursion2(v)
#将列表中的字符串"1"变成数字101
def recursion3(lis):
for i,v in enumerate(lis):
if v == '1':
lis[i] = 101
elif isinstance(v,list):
recursion3(v)
if __name__ == '__main__':
lis = [2, 3, "k", ["qwe", 20, ["k1", ["tt", 3, "1"]], 89], "ab", "adv"]
# a1. 将列表lis中的"tt"变成⼤写(⽤两种⽅式)。
#方法一:
# lis[3][2][1][0] = lis[3][2][1][0].upper()
# print(lis) #[2, 3, 'k', ['qwe', 20, ['k1', ['TT', 3, '1']], 89], 'ab', 'adv']
#方法二:
recursion1(lis)
print(lis) #[2, 3, 'k', ['qwe', 20, ['k1', ['TT', 3, '1']], 89], 'ab', 'adv']
# b. 将列表中的数字3变成字符串"100"(⽤两种⽅式)。
#方法一:
# lis[1],lis[3][2][1][1] = '100','100'
# print(lis)
#方法二:
recursion2(lis)
print(lis)
# c. 将列表中的字符串"1"变成数字101(⽤两种⽅式)。
#方法一:
# lis[3][2][1][2] = 101
# print(lis)
recursion3(lis)
print(lis)
递归实现循环打印列表的每个元素
def recursion_list(lis):
for i in lis:
if isinstance(i,list):
recursion_list(i)
else:
print(i,end=" ")
if __name__ == '__main__':
li = [1, 3, 4, "whw", [3, 7, 8, "WANGhw"], 5, "HH"]
recursion_list(li)
"""
1 3 4 whw 3 7 8 WANGhw 5 HH
"""
socket传输是选择是否根据协议接受与发送
# -*- coding:utf-8 -*-
import json
import struct
def pro_send(sk,dic,pro=True):
str_dic = json.dumps(dic)
bytes_dic = str_dic.encode('utf-8')
# 默认选择协议发送
if pro:
num_dic = struct.pack('i',len(bytes_dic))
sk.sendall(num_dic)
sk.sendall(bytes_dic)
def pro_recv(sk,pro=True):
# 依据协议先收到的是字典的长度
if pro:
num_dic = sk.recv(4)
# 注意unpack得到的是一个元组,需要加上0索引取值
num = struct.unpack('i',num_dic)[0]
str_dic = sk.recv(num).decode('utf-8')
dic = json.loads(str_dic)
# 不根据协议接收直接用1024大小的长度来接收
else:
dic = json.loads(sk.recv(1024).decode('utf-8'))
return dic