当前位置: 首页 > news >正文

改造python3中的http.server为简单的文件上传下载服务

改造

修改python3中的http.server.SimpleHTTPRequestHandler,实现简单的文件上传下载服务

 simple_http_file_server.py:

# !/usr/bin/env python3import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
import socket
from http import HTTPStatus
import threading
import contextlib__version__ = "0.1"
__all__ = ["MySimpleHTTPRequestHandler"]class MySimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):server_version = "SimpleHTTP/" + __version__extensions_map = _encodings_map_default = {'.gz': 'application/gzip','.Z': 'application/octet-stream','.bz2': 'application/x-bzip2','.xz': 'application/x-xz',}def __init__(self, *args, directory=None, **kwargs):if directory is None:directory = os.getcwd()self.directory = os.fspath(directory)super().__init__(*args, **kwargs)def do_GET(self):f = self.send_head()if f:try:self.copyfile(f, self.wfile)finally:f.close()def do_HEAD(self):f = self.send_head()if f:f.close()def send_head(self):path = self.translate_path(self.path)f = Noneif os.path.isdir(path):parts = urllib.parse.urlsplit(self.path)if not parts.path.endswith('/'):# redirect browser - doing basically what apache doesself.send_response(HTTPStatus.MOVED_PERMANENTLY)new_parts = (parts[0], parts[1], parts[2] + '/',parts[3], parts[4])new_url = urllib.parse.urlunsplit(new_parts)self.send_header("Location", new_url)self.end_headers()return Nonefor index in "index.html", "index.htm":index = os.path.join(path, index)if os.path.exists(index):path = indexbreakelse:return self.list_directory(path)ctype = self.guess_type(path)if path.endswith("/"):self.send_error(HTTPStatus.NOT_FOUND, "File not found")return Nonetry:f = open(path, 'rb')except OSError:self.send_error(HTTPStatus.NOT_FOUND, "File not found")return Nonetry:fs = os.fstat(f.fileno())# Use browser cache if possibleif ("If-Modified-Since" in self.headersand "If-None-Match" not in self.headers):# compare If-Modified-Since and time of last file modificationtry:ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])except (TypeError, IndexError, OverflowError, ValueError):# ignore ill-formed valuespasselse:if ims.tzinfo is None:# obsolete format with no timezone, cf.# https://tools.ietf.org/html/rfc7231#section-7.1.1.1ims = ims.replace(tzinfo=datetime.timezone.utc)if ims.tzinfo is datetime.timezone.utc:# compare to UTC datetime of last modificationlast_modif = datetime.datetime.fromtimestamp(fs.st_mtime, datetime.timezone.utc)# remove microseconds, like in If-Modified-Sincelast_modif = last_modif.replace(microsecond=0)if last_modif <= ims:self.send_response(HTTPStatus.NOT_MODIFIED)self.end_headers()f.close()return Noneself.send_response(HTTPStatus.OK)self.send_header("Content-type", ctype)self.send_header("Content-Length", str(fs[6]))self.send_header("Last-Modified",self.date_time_string(fs.st_mtime))self.end_headers()return fexcept:f.close()raisedef list_directory(self, path):try:list_dir = os.listdir(path)except OSError:self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")return Nonelist_dir.sort(key=lambda a: a.lower())r = []try:display_path = urllib.parse.unquote(self.path, errors='surrogatepass')except UnicodeDecodeError:display_path = urllib.parse.unquote(path)display_path = html.escape(display_path, quote=False)enc = sys.getfilesystemencoding()form = """<h1>文件上传</h1>\n<form ENCTYPE="multipart/form-data" method="post">\n<input name="file" type="file"/>\n<input type="submit" value="upload"/>\n</form>\n"""title = 'Directory listing for %s' % display_pathr.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ''"http://www.w3.org/TR/html4/strict.dtd">')r.append('<html>\n<head>')r.append('<meta http-equiv="Content-Type" ''content="text/html; charset=%s">' % enc)r.append('<title>%s</title>\n</head>' % title)r.append('<body>%s\n<h1>%s</h1>' % (form, title))r.append('<hr>\n<ul>')for name in list_dir:fullname = os.path.join(path, name)displayname = linkname = name# Append / for directories or @ for symbolic linksif os.path.isdir(fullname):displayname = name + "/"linkname = name + "/"if os.path.islink(fullname):displayname = name + "@"# Note: a link to a directory displays with @ and links with /r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),html.escape(displayname, quote=False)))r.append('</ul>\n<hr>\n</body>\n</html>\n')encoded = '\n'.join(r).encode(enc, 'surrogate escape')f = io.BytesIO()f.write(encoded)f.seek(0)self.send_response(HTTPStatus.OK)self.send_header("Content-type", "text/html; charset=%s" % enc)self.send_header("Content-Length", str(len(encoded)))self.end_headers()return fdef translate_path(self, path):# abandon query parameterspath = path.split('?', 1)[0]path = path.split('#', 1)[0]# Don't forget explicit trailing slash when normalizing. Issue17324trailing_slash = path.rstrip().endswith('/')try:path = urllib.parse.unquote(path, errors='surrogatepass')except UnicodeDecodeError:path = urllib.parse.unquote(path)path = posixpath.normpath(path)words = path.split('/')words = filter(None, words)path = self.directoryfor word in words:if os.path.dirname(word) or word in (os.curdir, os.pardir):# Ignore components that are not a simple file/directory namecontinuepath = os.path.join(path, word)if trailing_slash:path += '/'return pathdef copyfile(self, source, outputfile):shutil.copyfileobj(source, outputfile)def guess_type(self, path):base, ext = posixpath.splitext(path)if ext in self.extensions_map:return self.extensions_map[ext]ext = ext.lower()if ext in self.extensions_map:return self.extensions_map[ext]guess, _ = mimetypes.guess_type(path)if guess:return guessreturn 'application/octet-stream'def do_POST(self):r, info = self.deal_post_data()self.log_message('%s, %s => %s' % (r, info, self.client_address))enc = sys.getfilesystemencoding()res = ['<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ''"http://www.w3.org/TR/html4/strict.dtd">','<html>\n<head>','<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc,'<title>%s</title>\n</head>' % "Upload Result Page",'<body><h1>%s</h1>\n' % "Upload Result"]if r:res.append('<p>SUCCESS: %s</p>\n' % info)else:res.append('<p>FAILURE: %s</p>' % info)res.append('<a href=\"%s\">back</a>' % self.headers['referer'])res.append('</body></html>')encoded = '\n'.join(res).encode(enc, 'surrogate escape')f = io.BytesIO()f.write(encoded)length = f.tell()f.seek(0)self.send_response(200)self.send_header("Content-type", "text/html")self.send_header("Content-Length", str(length))self.end_headers()if f:self.copyfile(f, self.wfile)f.close()def deal_post_data(self):content_type = self.headers['content-type']if not content_type:return False, "Content-Type header doesn't contain boundary"boundary = content_type.split("=")[1].encode()remain_bytes = int(self.headers['content-length'])line = self.rfile.readline()remain_bytes -= len(line)if boundary not in line:return False, "Content NOT begin with boundary"line = self.rfile.readline()remain_bytes -= len(line)fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())if not fn:return False, "Can't find out file name..."path = self.translate_path(self.path)fn = os.path.join(path, fn[0])line = self.rfile.readline()remain_bytes -= len(line)line = self.rfile.readline()remain_bytes -= len(line)try:out = open(fn, 'wb')except IOError:return False, "Can't create file to write, do you have permission to write?"preline = self.rfile.readline()remain_bytes -= len(preline)while remain_bytes > 0:line = self.rfile.readline()remain_bytes -= len(line)if boundary in line:preline = preline[0:-1]if preline.endswith(b'\r'):preline = preline[0:-1]out.write(preline)out.close()return True, "File '%s' upload success!" % fnelse:out.write(preline)preline = linereturn False, "Unexpect Ends of data."def _get_best_family(*address):infos = socket.getaddrinfo(*address,type=socket.SOCK_STREAM,flags=socket.AI_PASSIVE,)family, type, proto, canonname, sockaddr = next(iter(infos))return family, sockaddrdef serve_forever(port=8000, bind=None, directory="."):"""This runs an HTTP server on port 8000 (or the port argument)."""# ensure dual-stack is not disabled; ref #38907class DualStackServer(http.server.ThreadingHTTPServer):def server_bind(self):# suppress exception when protocol is IPv4with contextlib.suppress(Exception):self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)return super().server_bind()def finish_request(self, request, client_address):self.RequestHandlerClass(request, client_address, self,directory=directory)HandlerClass=MySimpleHTTPRequestHandlerServerClass=DualStackServerprotocol="HTTP/1.0"ServerClass.address_family, addr = _get_best_family(bind, port)HandlerClass.protocol_version = protocolwith ServerClass(addr, HandlerClass) as httpd:host, port = httpd.socket.getsockname()[:2]url_host = f'[{host}]' if ':' in host else hostprint(f"Serving HTTP on {host} port {port} "f"(http://{url_host}:{port}/) ...")try:httpd.serve_forever()except KeyboardInterrupt:print("\nKeyboard interrupt received, exiting.")sys.exit(0)if __name__ == '__main__':import argparseparser = argparse.ArgumentParser()parser.add_argument('--bind', '-b', metavar='ADDRESS',help='specify alternate bind address ''(default: all interfaces)')parser.add_argument('--directory', '-d', default=os.getcwd(),help='specify alternate directory ''(default: current directory)')parser.add_argument('port', action='store', default=8000, type=int,nargs='?',help='specify alternate port (default: 8000)')args = parser.parse_args()# 在主线程中执行# serve_forever(# 	port=args.port,# 	bind=args.bind,#	directory = args.directory# )# 在子线程中执行,这样主线程可以执行异步工作thread1 = threading.Thread(name='t1',target= serve_forever,kwargs={"port" : args.port,"bind" : args.bind,"directory" : args.directory})thread1.start()# thread1.join()

使用帮助:

>python3 simple_http_file_server.py --help              
usage: simple_http_file_server.py [-h] [--cgi] [--bind ADDRESS] [--directory DIRECTORY] [port]

positional arguments:
  port                  specify alternate port (default: 8000)

optional arguments:
  -h, --help            show this help message and exit
  --cgi                 run as CGI server
  --bind ADDRESS, -b ADDRESS
                        specify alternate bind address (default: all interfaces)
  --directory DIRECTORY, -d DIRECTORY
                        specify alternate directory (default: current directory)

使用示例:

>python3 simple_http_file_server.py -d ~ 12345
Serving HTTP on :: port 12345 (http://[::]:12345/) ...
::ffff:127.0.0.1 - - [30/Nov/2023 09:35:06] "GET / HTTP/1.1" 200 -

。。。

访问:

浏览器中访问:http://127.0.0.1:12345/


参考:

        Python3 实现简单HTTP服务器(附带文件上传)_python3 -m http.server-CSDN博客

http://www.lryc.cn/news/252198.html

相关文章:

  • Fiddler抓包工具之fiddler的composer可以简单发送http协议的请求
  • 14、pytest像用参数一样使用fixture
  • C++ Primer Plus第十三章笔记
  • 【JavaEE】单例模式
  • 第十五届蓝桥杯模拟赛(第二期 C++)
  • 关于Unity中字典在Inspector的显示
  • 使用Plex结合cpolar搭建本地私人媒体站并实现远程访问
  • svn合并冲突时每个选项的含义
  • 指针、数组与函数例题3
  • ThreeJs样例 webgl_shadow_contact 分析
  • Nginx(缓冲区)
  • MQTT协议理解并实践
  • 实现一个简单的网络通信下(udp)
  • Linux中office环境LibreOffice_7.6.2下载
  • Linux快捷控制
  • 免费插件集-illustrator插件-Ai插件-重复复制-单一对象页面排版
  • GO基础之变量与常量
  • Docker Compose简单入门
  • 使用 PHPMailer 实现邮件的实时发送
  • 在Spring Boot中使用JavaMailSender发送邮件
  • python动态圣诞下雪图
  • 随心玩玩(十)git
  • 每日一练【盛最多水的容器】
  • Linux C语言 38-进程间通信IPC之信号
  • 前端使用 xlsx.js 工具读取 excel 遇到时间日期少 43 秒的解决办法
  • 问题记录-maven依赖升级或替换(简单版)
  • 00Hadoop数据仓库平台
  • java-jar包
  • Flink运行时架构核心概念
  • docker安装达梦数据库并挂在数据卷