当前位置: 首页 > news >正文

python3 简易 http server:实现本地与远程服务器传大文件

在个人目录下创建新文件httpserver.py

vim httpserver.py

文件内容为python3代码:

# !/usr/bin/env python3
import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
from http import HTTPStatus__version__ = "0.1"
__all__ = ["SimpleHTTPRequestHandler"]class SimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):server_version = "SimpleHTTP/" + __version__extensions_map = _encodings_map_default = {'.gz': 'application/gzip','.Z': 'application/octet-stream','.bz2': 'application/x-bzip2','.xz': 'application/x-xz',}def __init__(self, *args, directory=None, **kwargs):if directory is None:directory = os.getcwd()self.directory = os.fspath(directory)super().__init__(*args, **kwargs)def do_GET(self):f = self.send_head()if f:try:self.copyfile(f, self.wfile)finally:f.close()def do_HEAD(self):f = self.send_head()if f:f.close()def send_head(self):path = self.translate_path(self.path)f = Noneif os.path.isdir(path):parts = urllib.parse.urlsplit(self.path)if not parts.path.endswith('/'):# redirect browser - doing basically what apache doesself.send_response(HTTPStatus.MOVED_PERMANENTLY)new_parts = (parts[0], parts[1], parts[2] + '/',parts[3], parts[4])new_url = urllib.parse.urlunsplit(new_parts)self.send_header("Location", new_url)self.end_headers()return Nonefor index in "index.html", "index.htm":index = os.path.join(path, index)if os.path.exists(index):path = indexbreakelse:return self.list_directory(path)ctype = self.guess_type(path)if path.endswith("/"):self.send_error(HTTPStatus.NOT_FOUND, "File not found")return Nonetry:f = open(path, 'rb')except OSError:self.send_error(HTTPStatus.NOT_FOUND, "File not found")return Nonetry:fs = os.fstat(f.fileno())# Use browser cache if possibleif ("If-Modified-Since" in self.headersand "If-None-Match" not in self.headers):# compare If-Modified-Since and time of last file modificationtry:ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])except (TypeError, IndexError, OverflowError, ValueError):# ignore ill-formed valuespasselse:if ims.tzinfo is None:# obsolete format with no timezone, cf.# https://tools.ietf.org/html/rfc7231#section-7.1.1.1ims = ims.replace(tzinfo=datetime.timezone.utc)if ims.tzinfo is datetime.timezone.utc:# compare to UTC datetime of last modificationlast_modif = datetime.datetime.fromtimestamp(fs.st_mtime, datetime.timezone.utc)# remove microseconds, like in If-Modified-Sincelast_modif = last_modif.replace(microsecond=0)if last_modif <= ims:self.send_response(HTTPStatus.NOT_MODIFIED)self.end_headers()f.close()return Noneself.send_response(HTTPStatus.OK)self.send_header("Content-type", ctype)self.send_header("Content-Length", str(fs[6]))self.send_header("Last-Modified",self.date_time_string(fs.st_mtime))self.end_headers()return fexcept:f.close()raisedef list_directory(self, path):try:list_dir = os.listdir(path)except OSError:self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")return Nonelist_dir.sort(key=lambda a: a.lower())r = []try:display_path = urllib.parse.unquote(self.path, errors='surrogatepass')except UnicodeDecodeError:display_path = urllib.parse.unquote(path)display_path = html.escape(display_path, quote=False)enc = sys.getfilesystemencoding()form = """<h1>文件上传</h1>\n<form ENCTYPE="multipart/form-data" method="post">\n<input name="file" type="file"/>\n<input type="submit" value="upload"/>\n</form>\n"""title = 'Directory listing for %s' % display_pathr.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ''"http://www.w3.org/TR/html4/strict.dtd">')r.append('<html>\n<head>')r.append('<meta http-equiv="Content-Type" ''content="text/html; charset=%s">' % enc)r.append('<title>%s</title>\n</head>' % title)r.append('<body>%s\n<h1>%s</h1>' % (form, title))r.append('<hr>\n<ul>')for name in list_dir:fullname = os.path.join(path, name)displayname = linkname = name# Append / for directories or @ for symbolic linksif os.path.isdir(fullname):displayname = name + "/"linkname = name + "/"if os.path.islink(fullname):displayname = name + "@"# Note: a link to a directory displays with @ and links with /r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),html.escape(displayname, quote=False)))r.append('</ul>\n<hr>\n</body>\n</html>\n')encoded = '\n'.join(r).encode(enc, 'surrogate escape')f = io.BytesIO()f.write(encoded)f.seek(0)self.send_response(HTTPStatus.OK)self.send_header("Content-type", "text/html; charset=%s" % enc)self.send_header("Content-Length", str(len(encoded)))self.end_headers()return fdef translate_path(self, path):# abandon query parameterspath = path.split('?', 1)[0]path = path.split('#', 1)[0]# Don't forget explicit trailing slash when normalizing. Issue17324trailing_slash = path.rstrip().endswith('/')try:path = urllib.parse.unquote(path, errors='surrogatepass')except UnicodeDecodeError:path = urllib.parse.unquote(path)path = posixpath.normpath(path)words = path.split('/')words = filter(None, words)path = self.directoryfor word in words:if os.path.dirname(word) or word in (os.curdir, os.pardir):# Ignore components that are not a simple file/directory namecontinuepath = os.path.join(path, word)if trailing_slash:path += '/'return pathdef copyfile(self, source, outputfile):shutil.copyfileobj(source, outputfile)def guess_type(self, path):base, ext = posixpath.splitext(path)if ext in self.extensions_map:return self.extensions_map[ext]ext = ext.lower()if ext in self.extensions_map:return self.extensions_map[ext]guess, _ = mimetypes.guess_type(path)if guess:return guessreturn 'application/octet-stream'def do_POST(self):r, info = self.deal_post_data()self.log_message('%s, %s => %s' % (r, info, self.client_address))enc = sys.getfilesystemencoding()res = ['<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" ''"http://www.w3.org/TR/html4/strict.dtd">','<html>\n<head>','<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc,'<title>%s</title>\n</head>' % "Upload Result Page",'<body><h1>%s</h1>\n' % "Upload Result"]if r:res.append('<p>SUCCESS: %s</p>\n' % info)else:res.append('<p>FAILURE: %s</p>' % info)res.append('<a href=\"%s\">back</a>' % self.headers['referer'])res.append('</body></html>')encoded = '\n'.join(res).encode(enc, 'surrogate escape')f = io.BytesIO()f.write(encoded)length = f.tell()f.seek(0)self.send_response(200)self.send_header("Content-type", "text/html")self.send_header("Content-Length", str(length))self.end_headers()if f:self.copyfile(f, self.wfile)f.close()def deal_post_data(self):content_type = self.headers['content-type']if not content_type:return False, "Content-Type header doesn't contain boundary"boundary = content_type.split("=")[1].encode()remain_bytes = int(self.headers['content-length'])line = self.rfile.readline()remain_bytes -= len(line)if boundary not in line:return False, "Content NOT begin with boundary"line = self.rfile.readline()remain_bytes -= len(line)fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())if not fn:return False, "Can't find out file name..."path = self.translate_path(self.path)fn = os.path.join(path, fn[0])line = self.rfile.readline()remain_bytes -= len(line)line = self.rfile.readline()remain_bytes -= len(line)try:out = open(fn, 'wb')except IOError:return False, "Can't create file to write, do you have permission to write?"preline = self.rfile.readline()remain_bytes -= len(preline)while remain_bytes > 0:line = self.rfile.readline()remain_bytes -= len(line)if boundary in line:preline = preline[0:-1]if preline.endswith(b'\r'):preline = preline[0:-1]out.write(preline)out.close()return True, "File '%s' upload success!" % fnelse:out.write(preline)preline = linereturn False, "Unexpect Ends of data."if __name__ == '__main__':try:port = int(sys.argv[1])except Exception:port = 8000print('-------->> Warning: Port is not given, will use deafult port: 8000 ')print('-------->> if you want to use other port, please execute: ')print('-------->> python SimpleHTTPServerWithUpload.py port ')print("-------->> port is a integer and it's range: 1024 < port < 65535 ")http.server.test(HandlerClass=SimpleHTTPRequestHandler,ServerClass=http.server.HTTPServer,port=port)

在需要暴露的目录下启动http服务,如/data/codes/

cd /data/codes/
python3 httpserver.py 8888

随后在个人电脑访问http://ip:8888即可浏览文件、上传文件:
在这里插入图片描述

http://www.lryc.cn/news/156457.html

相关文章:

  • Microsoft Edge 主页启动diy以及常用的扩展、收藏夹的网站
  • 文末送书!谈谈原型模式在JAVA实战开发中的应用(附源码+面试题)
  • 视频汇聚/视频云存储/视频监控管理平台EasyCVR启动时打印starting server:listen tcp,该如何解决?
  • 【Linux从入门到精通】通信 | 管道通信(匿名管道 命名管道)
  • 实践和项目:解决实际问题时,选择合适的数据结构和算法
  • 上线检查工具(待完善)
  • PE文件格式详解
  • 【Alibaba中间件技术系列】「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析
  • 关于vue首屏加载loading问题
  • 数据库性能测试实践:慢查询统计分析
  • windows wsl ssh 配置流程 Permission denied (publickey)
  • OpenCV(五):图像颜色空间转换
  • 一图胜千言!数据可视化多维讲解(Python)
  • Hbase相关总结
  • C++ Primer Plus第二章编程练习答案
  • Web后端开发(请求响应)上
  • LeetCode 338. Counting Bits【动态规划,位运算】简单
  • 解释 Git 的基本概念和使用方式。
  • 计算机网络初识
  • python 笔记(2)——文件、异常、面向对象、装饰器、json
  • Meta AI的Nougat能够将数学表达式从PDF文件转换为机器可读文本
  • 【Python爬虫笔记】爬虫代理IP与访问控制
  • 50、Spring WebFlux 的 自动配置 的一些介绍,与 Spring MVC 的一些对比
  • 【算法专题突破】双指针 - 和为s的两个数字(6)
  • Redis7入门概述
  • SQL sever命名规范
  • BCSP-玄子Share-Java框基础_工厂模式/代理模式
  • 【数据结构】2015统考真题 6
  • HTML <track> 标签
  • php中识别url被篡改并阻止访问的实现方式是什么