我正在尝试写jpeg压缩,但是,我遇到了一些麻烦
当我运行代码时,错误是: cur_huffman_res_bitarray = huffman_table_AC[cur_huffman_key_tuple] 键错误:(28, 1)
Huffman_table 的最大值是 15,所以像 16,1 18,2 22,3 这样的元组会出错。
不知道怎么修改
import numpy as np
from DCT_quant import *
from padding_image import *
from zigzag import *
def block_generator(padded_matrix):
block = np.empty( (8,8), dtype=int) # 产生一个8*8的空矩阵
block_index = -1
for block_row_index in range( 0, padded_matrix.shape[0], 8): # 步长为8
for block_col_index in range( 0, padded_matrix.shape[1], 8):
block = padded_matrix[block_row_index:block_row_index+8, block_col_index:block_col_index+8] # 逐个获取padded_matrix的8*8分块矩阵
block_index += 1
yield (block_row_index, block_col_index, block ,block_index) # yield是干什么的??
def dct_quant_and_extract_DC_AC_from_padded_matrix(padded_matrix,quant_table_type):
block_total = int( padded_matrix.size/64 ) # 8*8的矩阵块数
dc = np.empty( block_total, dtype=int ) # dc系数, 大小为 block_total*1
ac_arrays = np.empty((block_total,63),dtype=int) # ac系数, 大小为 block_total*63
tmp_array = np.empty(64,dtype=int)
for( block_row_index, block_col_index, block, block_index ) in block_generator(padded_matrix):
quanted_block = quant_block(DCT_2D(block),quant_table_type) # 量化矩阵
tmp_array = zigzag_block_to_array(quanted_block) # 游格编码后的数组
dc[block_index] = tmp_array[0] # dc系数
ac_arrays[block_index]=tmp_array[1:64] # ac系数,每一个ac系数是一个一维数组
return dc,ac_arrays
def restore_padded_matrix_from_DC_AC(dc,ac_arrays,block_row_total, block_col_total,type):
restored_matrix = np.empty((8*block_row_total,8*block_col_total),dtype=int)
tmp_block = np.empty((8,8),dtype=int)
for row in range(0, block_row_total):
for col in range(0, block_col_total):
block_index = row*block_col_total + col # 第几个8*8块
tmp_block = zigzag_array_to_block( np.concatenate((dc[block_index:block_index+1],ac_arrays[block_index])) ) # 将dc系数和ac系数合并 将这个数组逆zigzag变换,变成8*8的数组
aaa = dequant_block(tmp_block,type)
restored_matrix[row*8:row*8+8,col*8:col*8+8] = iDCT_2D(aaa)
return restored_matrix
import numpy as np
from scipy import fftpack
def DCT_2D(block):
DCT_matrix = fftpack.dct( fftpack.dct(block.T, norm='ortho').T, norm='ortho' )
return DCT_matrix
def iDCT_2D(DCT_block):
block = fftpack.idct(fftpack.idct(DCT_block.T,norm='ortho').T,norm='ortho')
return block
def get_quantization_table(type):
if(type == 'lum'): # 亮度量化表
table = np.array(
[ [16, 11, 10, 16, 24, 40, 51, 61],
[12, 12, 14, 19, 26, 58, 60, 55],
[14, 13, 16, 24, 40, 57, 69, 56],
[14, 17, 22, 29, 51, 87, 80, 62],
[18, 22, 37, 56, 68, 109, 103, 77],
[24, 35, 55, 64, 81, 104, 113, 92],
[49, 64, 78, 87, 103, 121, 120, 101],
[72, 92, 95, 98, 112, 100, 103, 99] ]
)
elif(type == 'chrom'): #Cb Cr量化表
table = np.array(
[ [17, 18, 24, 47, 99, 99, 99, 99],
[18, 21, 26, 66, 99, 99, 99, 99],
[24, 26, 56, 99, 99, 99, 99, 99],
[47, 66, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99],
[99, 99, 99, 99, 99, 99, 99, 99] ]
)
return table
def quant_block(block, type):
quant_table = get_quantization_table(type) # 得到量化表
quanted_block = np.round(np.divide(block, quant_table)).astype(int)
return quanted_block
def dequant_block(block, type):
quant_table = get_quantization_table(type) # 得到量化表
dequanted_block = block*quant_table.astype(np.int32) # 对应分量相乘,然后取整
return dequanted_block
def DPCM(dc):
dpcm_arr = np.empty(dc.shape,dtype=dc.dtype)
dpcm_arr[0] = dc[0]
for i in range(1,dc.shape[0]):
dpcm_arr[i] = dc[i]-dc[i-1]
return dpcm_arr
def DPCM_decode(dpcm_arr):
dc_restored = np.empty(dpcm_arr.shape,dtype=dpcm_arr.dtype)
dc_restored[0] = dpcm_arr[0]
for i in range(1,dpcm_arr.shape[0]):
dc_restored[i] = dpcm_arr[i] + dc_restored[i-1]
return dc_restored
def decompose_int_to_size_value(n):
if( n == 0 ):
size = 0
value_bitarray = bitarray()
else:
size = len( bin(abs(n)) ) - 2 # 将n转换为2进制 bin(abs(n))后的格式如0b101010,size为n采用二进制编码后的比特数
flag = np.sign(n) # n<0,返回-1; n=0,返回0; n>0,返回1
abs_bitarray = bitarray( bin(abs(n))[2:] ) # 把开头的0b去掉
if( flag == -1):
value_bitarray = flip_bitarray(abs_bitarray) # n<0时翻转二进制数组
else:
value_bitarray = abs_bitarray
return (size, value_bitarray)
def compose_size_value_to_int(size,value_bitarray):
if(size == 0):
restored_int_val = 0
else:
if( value_bitarray[0] == 1 ): # 说明是正数
restored_int_val = int(value_bitarray[:size].to01(),2) #截取0到size下标的二进制,能解释传入的value_bitarray贼长
elif( value_bitarray[0] == 0): # 负数
restored_int_val = - int(flip_bitarray(value_bitarray[:size]).to01(),2)
return restored_int_val
def flip_bitarray(barr):
converted_str = barr.to01()
flipped_str = ''
for i in converted_str:
if (i=='0'): flipped_str+='1'
else: flipped_str+='0'
return bitarray(flipped_str)
def encode_DC_entropy_all(dpcm_arr):
size_bitarray = bitarray()
value_bitarray = bitarray()
for n in dpcm_arr:
cur_size, cur_value_bitarray = decompose_int_to_size_value(n) # cur_size:n所占的比特位数 cur_value_bitarray:二进制数组序列
cur_size_bitarray = huffman_table_DC[hex(cur_size)[2:]] # hex先转换为16进制后再把开头的0x去掉,由DC哈夫曼表得到cur_size的编码
size_bitarray += cur_size_bitarray
value_bitarray += cur_value_bitarray
return size_bitarray, value_bitarray
def decode_DC_entropy_all(size_bitarray,value_bitarray):
size_decoded_list = size_bitarray.decode(huffman_table_DC) # 将二进制序列的数组 根据DC哈夫曼表 还原为16进制的数组
DC_numbers = len(size_decoded_list) # DC分量数组的长度
dpcm_arr_restored = np.empty(DC_numbers, dtype=int)
index = 0
for str_cur_size in size_decoded_list:
cur_size = int(str_cur_size,16)
restored_val = compose_size_value_to_int(cur_size,value_bitarray) #根据size和bitarray还原成int值,这里的value_bitarray包含此处的值和后面的所有制,函数里面做了截取
dpcm_arr_restored[index] = restored_val
value_bitarray = value_bitarray[cur_size:] # 除掉已经运算之后的
index += 1
return dpcm_arr_restored
def RLE(ac):
rle_list = []
consecutive_0s = 0 # 连续0的数量
for ac_coefficient in ac:
if( ac_coefficient == 0 ):
consecutive_0s += 1
else:
rle_list.append( (consecutive_0s,ac_coefficient) )
consecutive_0s = 0 # bug 给错了
# 不要append(0,0) 直接把他放入huffman_ac表里面
return rle_list
def RLE_decode(rle):
ac_restored_lst = []
for consecutive_0s,number in rle:
ac_restored_lst.extend([0]*consecutive_0s)
ac_restored_lst.append(number)
ac_restored_lst.extend([0]*(63-len(ac_restored_lst))) #末尾补上0
ac_restored_array = np.array(ac_restored_lst)
return ac_restored_array
def decompose_RLE_list_to_huffmanResBitarray_valueBitarray(rle_list):
huffman_res_bitarray = bitarray()
value_bitarray = bitarray()
for item in rle_list:
consecutive_0s = item[0] # 0的个数
(cur_size, cur_value_bitarray) = decompose_int_to_size_value(item[1])
cur_huffman_key_tuple = (consecutive_0s,cur_size) # key值 0的个数和当前值的二进制比特数构成tuple
cur_huffman_res_bitarray = huffman_table_AC[cur_huffman_key_tuple]
huffman_res_bitarray += cur_huffman_res_bitarray
value_bitarray += cur_value_bitarray # 之后根据这个确定取几个bit
huffman_res_bitarray += bitarray('1010') # 表示block编码的结束
return huffman_res_bitarray, value_bitarray
def All_block_huffmanResbitarray_valueBitarray_to_RLE_lists(all_block_huffman_res_bitarray,all_block_value_bitarray):
all_block_rle_lists = []
value_bitarray_left = all_block_value_bitarray
cur_block_index = 0
cur_block_RLE_lst = []
AC_huffman_decode_res = all_block_huffman_res_bitarray.decode(huffman_table_AC) #得到形如 [(0,1),(2,0),...]的表达形式
for cur_consecutive_0s,cur_size in AC_huffman_decode_res:
if((cur_consecutive_0s,cur_size) == (0,0)): # reaching end of 1 8x8 block
all_block_rle_lists.append(cur_block_RLE_lst)
cur_block_index += 1
cur_block_RLE_lst = []
continue
restored_val = compose_size_value_to_int(cur_size,value_bitarray_left)
cur_block_RLE_lst.append([cur_consecutive_0s,restored_val])
value_bitarray_left = value_bitarray_left[cur_size:]
return all_block_rle_lists
from PIL import Image
import numpy as np
from DCT_quant import *
from zigzag import *
import math
def martix_padding(npmat):
rows = npmat.shape[0] # 获取行数
rows_pad = math.ceil( rows/8 ) * 8
cols = npmat.shape[1] # 获取列数
cols_pad = math.ceil( cols/8 ) * 8
mat_append_col = np.broadcast_to( npmat[:,-1][:,None], (npmat.shape[0],cols_pad - npmat.shape[1]) ) # 要填充的元素
mat_padded_col = np.hstack( (npmat,mat_append_col) ) # 此时 矩阵的列数满足8的倍数
mat_append_row = np.broadcast_to(mat_padded_col[-1,:],(rows_pad - mat_padded_col.shape[0],cols_pad)) # 要填充的元素
mat_padded_row = np.vstack((mat_padded_col,mat_append_row)) # 此时 矩阵的行数也满足8的倍数
mat_pad = mat_padded_row
return mat_pad
def restore_img_from_padding(Y_padded,Cb_padded,Cr_padded,rows_origin,cols_origin):
Cb_restored = np.repeat((np.repeat(Cb_padded,2,axis=1)),2,axis=0) ## 第一个axis有问题
Cr_restored = np.repeat((np.repeat(Cr_padded,2,axis=1)),2,axis=0)
npmat_restored = np.empty( [rows_origin,cols_origin,3], dtype=np.uint8)
npmat_restored[:,:,0] = Y_padded[ :rows_origin, :cols_origin]
npmat_restored[:,:,1] = Cb_restored[:rows_origin, :cols_origin]
npmat_restored[:,:,2] = Cr_restored[:rows_origin, :cols_origin]
return npmat_restored
from pickletools import uint8
import numpy as np
def zigzag_sequence_generator(n):
'''
生成zigzag编码序列,返回的generator的item形如[2,3] \n
具体走的方向、判断0和7看zigzag的图
'''
point = np.array([0,0])
MOVING_RIGHT,MOVING_DOWN_LEFT,MOVING_DOWN,MOVING_UP_RIGHT = range(4) #4种状态
state = MOVING_RIGHT
for i in range(n*n):
yield (i, point) # 每一个项是 第几个 和 点的值
if( state == MOVING_RIGHT ):
point += [0,1]
if( point[0] == 0 ):
state = MOVING_DOWN_LEFT
elif( point[0] == 7 ):
state = MOVING_UP_RIGHT
elif( state == MOVING_DOWN_LEFT ):
point += [1,-1]
if( point[0] == 7 ):
state = MOVING_RIGHT
elif( point[1] == 0 ):
state = MOVING_DOWN
elif( state == MOVING_DOWN ):
point += [1,0]
if( point[1] == 0 ):
state = MOVING_UP_RIGHT
elif( point[1] == 7 ):
state = MOVING_DOWN_LEFT
elif( state == MOVING_UP_RIGHT ):
point += [-1,1]
if( point[1] == 7 ):
state = MOVING_DOWN
elif( point[0] ==0 ):
state = MOVING_RIGHT
n = 8
zigzag_sequence_array = np.empty( (n*n, 2), dtype=int)
# 生成zigzag数组 item项形如[2,3]
for (i,point) in zigzag_sequence_generator(n):
zigzag_sequence_array[i] = point
def zigzag_block_to_array(block_quant_known):
'''
将量化之后的8*8矩阵 根据zigzag编码 转换为一维数组
'''
array = np.empty(n*n, dtype=int)
for i,point in enumerate(zigzag_sequence_array): #enumerate 数组转化为对象
array[i] = block_quant_known[tuple(point)]
return array
def zigzag_array_to_block(array_known):
'''
将zigzag编码后的一维数组 转化为 8*8的矩阵块
'''
block = np.empty((n,n), dtype=int)
for i,point in enumerate(zigzag_sequence_array):
block[tuple(point)] = array_known[i]
return block
import argparse
import numpy as np
from bitarray import bitarray
from bitarray.util import ba2int
from PIL import Image
from DC_AC_extract import dct_quant_and_extract_DC_AC_from_padded_matrix
from entropy_encoding import DPCM, RLE, decompose_RLE_list_to_huffmanResBitarray_valueBitarray, encode_DC_entropy_all,decode_DC_entropy_all
from padding_image import *
def main():
# 输入格式 python encoder.py 1.jpg 123
parser = argparse.ArgumentParser()
parser.add_argument("image_to_compress", help="path to the input image")
parser.add_argument("compressed_file", help="path to the output compressed file")
args = parser.parse_args()
input_image_path = args.image_to_compress
output_image_path = args.compressed_file
image_to_compress = Image.open(input_image_path) # 读入图片
# 1. 图片由RGB转换为YCbCr
ycbcr = image_to_compress.convert('YCbCr') # RGB转换为YCbCr
npmat = np.array(ycbcr, dtype=int) - 128 # 归一化处理,每一个分量的范围-128~127, npmat:width * height * 3
# print(npmat)
rows,cols = npmat.shape[0],npmat.shape[1] # 像素的行数和列数
# 2. 色度缩减取样 subsampling(4:2:0) + padding
'''
这个步骤中,需要进行2*2的矩阵分块,
之后矩阵大小变为 rows/2 和 cols/2,
而dct中需要切割8*8的矩阵块,则 rows/2和cols/2需要是8的倍数
故:rows和cols是16的倍数
方法1:直接把原图像的rows和cols变为16的倍数
方法2:Y,Cb,Cr分离,对于Y只需要8的倍数,而Cb和Cr先采样,再变为8的倍数
demo:7*7大小的矩阵,采样后变为4*4,然后变为8*8;
'''
y = npmat[:,:,0] # 取每一个元素的第三个分量 y:width * height * 1,YCbCr中的Y分量
Cb = npmat[::2,::2,1] # 行数和列数步长为2
Cr = npmat[::2,::2,2] # 行数和列数步长为2
Y_padded = martix_padding(y) # 将矩阵补充成8的倍数
Cb_padded = martix_padding(Cb) # 将矩阵补充成8的倍数
Cr_padded = martix_padding(Cr) # 将矩阵补充成8的倍数
# 3. dct变换 + quant量化 + dc/ac提取
# 提取dc和ac系数
dc_y,ac_arrays_y = dct_quant_and_extract_DC_AC_from_padded_matrix(Y_padded, 'lum')
dc_cb,ac_arrays_cb = dct_quant_and_extract_DC_AC_from_padded_matrix(Cb_padded, 'chrom')
dc_cr,ac_arrays_cr = dct_quant_and_extract_DC_AC_from_padded_matrix(Cr_padded, 'chrom')
# dpcm + dc的熵编码
# 差分编码
dpcm_y = DPCM(dc_y)
dpcm_cb = DPCM(dc_cb)
dpcm_cr = DPCM(dc_cr)
# dc差分编码
size_bitarray_dc_y, value_bitarray_dc_y = encode_DC_entropy_all(dpcm_y)
size_bitarray_dc_cb, value_bitarray_dc_cb = encode_DC_entropy_all(dpcm_cb)
size_bitarray_dc_cr, value_bitarray_dc_cr = encode_DC_entropy_all(dpcm_cr)
# 对于每一个8*8矩阵块的ac系数进行RLE行程编码,然后通过比特编码联合在一起
huffman_res_bitarray_ac_y = bitarray()
value_res_bitarray_ac_y = bitarray()
for ac_y in ac_arrays_y:
tmp_RLE_res = RLE(ac_y)
huffman_tmp_bitarray_ac_y, value_tmp_bitarray_ac_y = decompose_RLE_list_to_huffmanResBitarray_valueBitarray(tmp_RLE_res)
huffman_res_bitarray_ac_y += huffman_tmp_bitarray_ac_y
value_res_bitarray_ac_y += value_tmp_bitarray_ac_y
huffman_res_bitarray_ac_cb = bitarray()
value_res_bitarray_ac_cb = bitarray()
for ac_cb in ac_arrays_cb:
tmp_RLE_res = RLE(ac_cb)
huffman_tmp_bitarray_ac_cb, value_tmp_bitarray_ac_cb = decompose_RLE_list_to_huffmanResBitarray_valueBitarray(tmp_RLE_res)
huffman_res_bitarray_ac_cb += huffman_tmp_bitarray_ac_cb
value_res_bitarray_ac_cb += value_tmp_bitarray_ac_cb
huffman_res_bitarray_ac_cr = bitarray()
value_res_bitarray_ac_cr = bitarray()
for ac_cr in ac_arrays_cr:
tmp_RLE_res = RLE(ac_cr)
huffman_tmp_bitarray_ac_cr, value_tmp_bitarray_ac_cr = decompose_RLE_list_to_huffmanResBitarray_valueBitarray(tmp_RLE_res)
huffman_res_bitarray_ac_cr += huffman_tmp_bitarray_ac_cr
value_res_bitarray_ac_cr += value_tmp_bitarray_ac_cr
with open(output_image_path, 'wb') as outFile:
bitarray_lst = [
size_bitarray_dc_y,value_bitarray_dc_y,\
size_bitarray_dc_cb,value_bitarray_dc_cb,\
size_bitarray_dc_cr,value_bitarray_dc_cr,\
huffman_res_bitarray_ac_y,value_res_bitarray_ac_y,\
huffman_res_bitarray_ac_cb,value_res_bitarray_ac_cb,\
huffman_res_bitarray_ac_cr,value_res_bitarray_ac_cr
]
write_bitarray = bitarray()
rows_barr = bitarray( format(rows, '#018b')[2:] )
cols_barr = bitarray( format(cols, '#018b')[2:] )
write_bitarray += rows_barr
write_bitarray += cols_barr
print("行数:"+ format(rows, '#018b')[2:] + " 列数:" + format(cols, '#018b')[2:])
for barr in bitarray_lst:
cur_bit_len_barr = bitarray( format(len(barr), '#034b')[2:] )
write_bitarray += cur_bit_len_barr
for barr in bitarray_lst:
write_bitarray += barr
write_bitarray.tofile(outFile)
if __name__ == "__main__":
main()