我有一个从中复制文件的应用程序 src
至 dst
:
import shutil
from threading import Thread
t = Thread(target=shutil.copy, args=[ src, dst ]).start()
我希望应用程序每隔5秒查询一次副本的进度,而不会锁定应用程序本身。这可能吗?
我的目的是将这一进展设定为a QtGui.QLabel
向用户提供有关文件副本的反馈。
使用线程shutil文件副本进行复制时可以实现吗?
我有一个从中复制文件的应用程序 src
至 dst
:
import shutil
from threading import Thread
t = Thread(target=shutil.copy, args=[ src, dst ]).start()
我希望应用程序每隔5秒查询一次副本的进度,而不会锁定应用程序本身。这可能吗?
我的目的是将这一进展设定为a QtGui.QLabel
向用户提供有关文件副本的反馈。
使用线程shutil文件副本进行复制时可以实现吗?
shutil.copy()
不提供跟踪进度的任何选项,不。最多可以监控目标文件的大小(使用 os.*
目标文件名上的函数)。
另一种方法是实现自己的复制功能。实施非常简单; shutil.copy()
基本上是一个 shutil.copyfile()
加 shutil.copymode()
呼叫; shutil.copyfile()
反过来将实际工作委托给 shutil.copyfileobj()
(链接到Python源代码)。
实现自己的 shutil.copyfileobj()
包括进展应该是微不足道的;注入对回调函数的支持,以便在每次复制另一个块时报告通知您的程序:
def copyfileobj(fsrc, fdst, callback, length=16*1024):
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
copied += len(buf)
callback(copied)
并比较 copied
文件大小的大小。
我结合了 Martijn Pieters的回答 有一些进度条形码来自 这个答案 修改工作在PyCharm从 这个答案 这给了我以下。功能 copy_with_progress
是我的目标。
import os
import shutil
def progress_percentage(perc, width=None):
# This will only work for python 3.3+ due to use of
# os.get_terminal_size the print function etc.
FULL_BLOCK = '█'
# this is a gradient of incompleteness
INCOMPLETE_BLOCK_GRAD = ['░', '▒', '▓']
assert(isinstance(perc, float))
assert(0. <= perc <= 100.)
# if width unset use full terminal
if width is None:
width = os.get_terminal_size().columns
# progress bar is block_widget separator perc_widget : ####### 30%
max_perc_widget = '[100.00%]' # 100% is max
separator = ' '
blocks_widget_width = width - len(separator) - len(max_perc_widget)
assert(blocks_widget_width >= 10) # not very meaningful if not
perc_per_block = 100.0/blocks_widget_width
# epsilon is the sensitivity of rendering a gradient block
epsilon = 1e-6
# number of blocks that should be represented as complete
full_blocks = int((perc + epsilon)/perc_per_block)
# the rest are "incomplete"
empty_blocks = blocks_widget_width - full_blocks
# build blocks widget
blocks_widget = ([FULL_BLOCK] * full_blocks)
blocks_widget.extend([INCOMPLETE_BLOCK_GRAD[0]] * empty_blocks)
# marginal case - remainder due to how granular our blocks are
remainder = perc - full_blocks*perc_per_block
# epsilon needed for rounding errors (check would be != 0.)
# based on reminder modify first empty block shading
# depending on remainder
if remainder > epsilon:
grad_index = int((len(INCOMPLETE_BLOCK_GRAD) * remainder)/perc_per_block)
blocks_widget[full_blocks] = INCOMPLETE_BLOCK_GRAD[grad_index]
# build perc widget
str_perc = '%.2f' % perc
# -1 because the percentage sign is not included
perc_widget = '[%s%%]' % str_perc.ljust(len(max_perc_widget) - 3)
# form progressbar
progress_bar = '%s%s%s' % (''.join(blocks_widget), separator, perc_widget)
# return progressbar as string
return ''.join(progress_bar)
def copy_progress(copied, total):
print('\r' + progress_percentage(100*copied/total, width=30), end='')
def copyfile(src, dst, *, follow_symlinks=True):
"""Copy data from src to dst.
If follow_symlinks is not set and src is a symbolic link, a new
symlink will be created instead of copying the file it points to.
"""
if shutil._samefile(src, dst):
raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if shutil.stat.S_ISFIFO(st.st_mode):
raise shutil.SpecialFileError("`%s` is a named pipe" % fn)
if not follow_symlinks and os.path.islink(src):
os.symlink(os.readlink(src), dst)
else:
size = os.stat(src).st_size
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst, callback=copy_progress, total=size)
return dst
def copyfileobj(fsrc, fdst, callback, total, length=16*1024):
copied = 0
while True:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
copied += len(buf)
callback(copied, total=total)
def copy_with_progress(src, dst, *, follow_symlinks=True):
if os.path.isdir(dst):
dst = os.path.join(dst, os.path.basename(src))
copyfile(src, dst, follow_symlinks=follow_symlinks)
shutil.copymode(src, dst)
return dst
不,这不可能这样做,因为 shutil.copy
没有任何提供进步的方法。
但是你可以编写自己的复制功能(甚至可以从中分叉代码) shutil
- 注意它是包含链接的模块之一 来源 在顶部,意味着它对于示例代码和仅仅使用as-is一样有用。例如,您的函数可以将进度回调函数作为额外参数,并在每个缓冲区(或每N个缓冲区,或每N个字节,或每N秒)之后调用它。就像是:
def copy(src, dst, progress):
# ...
for something:
progress(bytes_so_far, bytes_total)
# ...
progress(bytes_total, bytes_total)
现在,该回调仍将在后台线程中调用,而不是主线程。对于大多数GUI框架,这意味着它无法直接触摸任何GUI小部件。但是大多数GUI框架都有一种方法可以将消息从后台线程发布到主线程的事件循环中,所以只需让回调就这样做。使用Qt,您可以使用信号和插槽,与主线程中的方式完全相同;如果你不知道如何,那里有很多很棒的教程。
或者,您可以按照建议的方式执行此操作:让主线程信号通知后台线程(例如,通过发布在 queue.Queue
或触发 Event
要么 Condition
)并拥有你的 copy
每次通过循环功能检查该信号并做出响应。但这似乎更复杂,反应更慢。
还有一件事:Qt有自己的线程库,您可能希望使用它而不是Python的本机,因为您可以直接将插槽连接到 QThread
对象,并使你的回调。我不确定,但Qt甚至可能在某处有自己的文件复制进度方法;他们试图结束 一切 平台之间可能完全不同,与GUI模糊相关。
除了Martijn Pieters的优秀回复,如果(像我一样, 我是个白痴你需要弄清楚如何将实际的回调传递给 copyfileobj()
功能,你可以这样做:
def myscopefunction():
### Inside wherever you want to call the copyfileobj() function, you can
### make a nested function like so:
def progress(bytescopied):
updateui(bytescopied) #update your progress bar or whatever
#and then call it like this
copyfileobj(source,destination,progress)
...
这可能有点hacky但它的工作原理:
"""
Copying a file and checking its progress while it's copying.
"""
import os
import shutil
import threading
import time
des = r'<PATH/TO/SPURCE/FILE>'
src = r'<PATH/TO/DESTINATION/FILE>'
def checker(source_path, destination_path):
"""
Compare 2 files till they're the same and print the progress.
:type source_path: str
:param source_path: path to the source file
:type destination_path: str
:param destination_path: path to the destination file
"""
# Making sure the destination path exists
while not os.path.exists(destination_path):
print "not exists"
time.sleep(.01)
# Keep checking the file size till it's the same as source file
while os.path.getsize(source_path) != os.path.getsize(destination_path):
print "percentage", int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
time.sleep(.01)
print "percentage", 100
def copying_file(source_path, destination_path):
"""
Copying a file
:type source_path: str
:param source_path: path to the file that needs to be copied
:type destination_path: str
:param destination_path: path to where the file is going to be copied
:rtype: bool
:return: True if the file copied successfully, False otherwise
"""
print "Copying...."
shutil.copyfile(source_path, destination_path)
if os.path.exists(destination_path):
print "Done...."
return True
print "Filed..."
return False
t = threading.Thread(name='copying', target=copying_file, args=(src, des))
# Start the copying on a separate thread
t.start()
# Checking the status of destination file on a separate thread
b = threading.Thread(name='checking', target=checker, args=(src, des))
b.start()