我正在寻找一种定义自定义的方法 quoting
同 csv.writer
在Python中。有4种内置的方法来评估值:
csv.QUOTE_ALL, csv.QUOTE_MINIMAL, csv.QUOTE_NONNUMERIC, csv.QUOTE_NONE
但是我需要一个能模仿Postgres的引用机制 FORCE QUOTE *
,即它将引用所有非None值。同 csv.QUOTE_ALL
Python会将None变为 ''
但我想换空字符串。
内置是否可以做到这一点 csv
模块(我对黑客不感兴趣,我已经这样做了:P)?或者我被迫写/获得一些自定义的csv解析器?
一般来说:是否可以编写自定义引用机制 csv
模块?
禁用 csv
自己引用并添加引号:
def quote(col):
if col is None:
return ''
# uses double-quoting style to escape existing quotes
return '"{}"'.format(str(col).replace('"', '""'))
writer = csv.writer(fileobj, quoting=csv.QUOTE_NONE, escapechar='', quotechar='')
for row in rows:
writer.writerow(map(quote, row))
通过设置两者 escapechar
和 quotechar
要清空字符串,请避免模块引用已经引用的值。
以上工作只要你不使用 分隔符 在csv值。
请注意,到目前为止,您自己编写逗号分隔的行会更容易:
with open(filename, 'w'), fd:
for row in rows:
fd.write(','.join(map(quote, row)) + '\r\n')
禁用 csv
自己引用并添加引号:
def quote(col):
if col is None:
return ''
# uses double-quoting style to escape existing quotes
return '"{}"'.format(str(col).replace('"', '""'))
writer = csv.writer(fileobj, quoting=csv.QUOTE_NONE, escapechar='', quotechar='')
for row in rows:
writer.writerow(map(quote, row))
通过设置两者 escapechar
和 quotechar
要清空字符串,请避免模块引用已经引用的值。
以上工作只要你不使用 分隔符 在csv值。
请注意,到目前为止,您自己编写逗号分隔的行会更容易:
with open(filename, 'w'), fd:
for row in rows:
fd.write(','.join(map(quote, row)) + '\r\n')
我写了自己的csv编写器,它完全符合我的要求:
class PostgresCSVWriter(object):
def __init__(self, stream, quotechar="\"", delimiter=",", escapechar="\\"):
self.stream = stream
self.quotechar = quotechar
self.delimiter = delimiter
self.escapechar = escapechar
self.buffer_size = 16384
def _convert_value(self, obj):
if obj is None:
return ""
value = str(obj)
value = value.replace(self.quotechar, self.quotechar+self.quotechar)
value = value.replace(self.delimiter, self.escapechar+self.delimiter)
return self.quotechar+value+self.quotechar
def _convert_row(self, row):
return self.delimiter.join(self._convert_value(v) for v in row) + "\r\n"
def writerow(self, row):
self.stream.write(self._convert_row(row))
def writerows(self, rows):
data = ""
counter = 0
for row in rows:
buf = self._convert_row(row)
data += buf
counter += len(buf)
if counter >= self.buffer_size:
self.stream.write(data)
data = ""
counter = 0
if data:
self.stream.write(data)
如果有人发现任何问题,请告诉我。我还在寻找解决方案 csv
虽然模块。