jpeg压缩霍夫曼表

问题描述 投票:0回答:0

我正在尝试写jpeg压缩,但是,我遇到了一些麻烦

当我运行代码时,错误是: cur_huffman_res_bitarray = huffman_table_AC[cur_huffman_key_tuple] 键错误:(28, 1)

Huffman_table 的最大值是 15,所以像 16,1 18,2 22,3 这样的元组会出错。

不知道怎么修改

import numpy as np 
from DCT_quant import *
from padding_image import *
from zigzag import *

def block_generator(padded_matrix):
  block = np.empty( (8,8), dtype=int) # 产生一个8*8的空矩阵
  block_index = -1
  for block_row_index in range( 0, padded_matrix.shape[0], 8): # 步长为8
    for block_col_index in range( 0, padded_matrix.shape[1], 8):
      block = padded_matrix[block_row_index:block_row_index+8, block_col_index:block_col_index+8] # 逐个获取padded_matrix的8*8分块矩阵
      block_index += 1
      yield (block_row_index, block_col_index, block ,block_index) # yield是干什么的??


def dct_quant_and_extract_DC_AC_from_padded_matrix(padded_matrix,quant_table_type):
  block_total = int( padded_matrix.size/64 ) # 8*8的矩阵块数
  dc = np.empty( block_total, dtype=int ) # dc系数, 大小为 block_total*1
  ac_arrays = np.empty((block_total,63),dtype=int) # ac系数, 大小为 block_total*63
  tmp_array = np.empty(64,dtype=int)

  for( block_row_index, block_col_index, block, block_index ) in block_generator(padded_matrix):
    quanted_block = quant_block(DCT_2D(block),quant_table_type) # 量化矩阵
    tmp_array = zigzag_block_to_array(quanted_block) # 游格编码后的数组
    dc[block_index] = tmp_array[0] # dc系数
    ac_arrays[block_index]=tmp_array[1:64] # ac系数,每一个ac系数是一个一维数组
  
  return dc,ac_arrays

def restore_padded_matrix_from_DC_AC(dc,ac_arrays,block_row_total, block_col_total,type):
  restored_matrix = np.empty((8*block_row_total,8*block_col_total),dtype=int)
  tmp_block = np.empty((8,8),dtype=int)

  for row in range(0, block_row_total):
    for col in range(0, block_col_total):
      block_index = row*block_col_total + col # 第几个8*8块
      tmp_block = zigzag_array_to_block( np.concatenate((dc[block_index:block_index+1],ac_arrays[block_index])) ) # 将dc系数和ac系数合并 将这个数组逆zigzag变换,变成8*8的数组
      aaa = dequant_block(tmp_block,type)
      restored_matrix[row*8:row*8+8,col*8:col*8+8] = iDCT_2D(aaa)

  return restored_matrix

import numpy as np
from scipy import fftpack

def DCT_2D(block):
  DCT_matrix = fftpack.dct( fftpack.dct(block.T, norm='ortho').T, norm='ortho' )
  return DCT_matrix

def iDCT_2D(DCT_block):
  block = fftpack.idct(fftpack.idct(DCT_block.T,norm='ortho').T,norm='ortho')
  return block

def get_quantization_table(type):
  if(type == 'lum'): # 亮度量化表
    table = np.array(
      [ [16, 11, 10, 16, 24,    40,  51,    61],
        [12, 12, 14, 19, 26,    58,  60,    55],
        [14, 13, 16, 24, 40,    57,  69,    56],
        [14, 17, 22, 29, 51,    87,  80,    62],
        [18, 22, 37, 56, 68,    109, 103, 77],
        [24, 35, 55, 64, 81,    104, 113, 92],
        [49, 64, 78, 87, 103, 121, 120, 101],
        [72, 92, 95, 98, 112, 100, 103, 99] ]
    )
  elif(type == 'chrom'): #Cb Cr量化表
    table = np.array(
      [ [17, 18, 24, 47, 99, 99, 99, 99],
        [18, 21, 26, 66, 99, 99, 99, 99],
        [24, 26, 56, 99, 99, 99, 99, 99],
        [47, 66, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99] ]
    )
  return table

def quant_block(block, type):
  quant_table = get_quantization_table(type) # 得到量化表
  quanted_block = np.round(np.divide(block, quant_table)).astype(int)
  return quanted_block

def dequant_block(block, type):
  quant_table = get_quantization_table(type) # 得到量化表
  dequanted_block = block*quant_table.astype(np.int32) # 对应分量相乘,然后取整
  return dequanted_block

def DPCM(dc):
  dpcm_arr = np.empty(dc.shape,dtype=dc.dtype)
  dpcm_arr[0] = dc[0]
  
  for i in range(1,dc.shape[0]):
    dpcm_arr[i] = dc[i]-dc[i-1]
  
  return dpcm_arr

def DPCM_decode(dpcm_arr):
  dc_restored = np.empty(dpcm_arr.shape,dtype=dpcm_arr.dtype)
  dc_restored[0] = dpcm_arr[0]
  for i in range(1,dpcm_arr.shape[0]):
    dc_restored[i] = dpcm_arr[i] + dc_restored[i-1]
  return dc_restored
  
def decompose_int_to_size_value(n):
  if( n == 0 ):
    size = 0
    value_bitarray = bitarray()
  else:
    size = len( bin(abs(n)) ) - 2 # 将n转换为2进制 bin(abs(n))后的格式如0b101010,size为n采用二进制编码后的比特数
    flag = np.sign(n) # n<0,返回-1; n=0,返回0; n>0,返回1
    abs_bitarray = bitarray( bin(abs(n))[2:] ) # 把开头的0b去掉
    if( flag == -1):
      value_bitarray = flip_bitarray(abs_bitarray) # n<0时翻转二进制数组
    else:
      value_bitarray = abs_bitarray
  return (size, value_bitarray)

def compose_size_value_to_int(size,value_bitarray):
  if(size == 0): 
    restored_int_val = 0
  else:
    if( value_bitarray[0] == 1 ): # 说明是正数
      restored_int_val =  int(value_bitarray[:size].to01(),2) #截取0到size下标的二进制,能解释传入的value_bitarray贼长
    elif( value_bitarray[0] == 0): # 负数
      restored_int_val = - int(flip_bitarray(value_bitarray[:size]).to01(),2)

  return restored_int_val

def flip_bitarray(barr):
  converted_str = barr.to01()
  flipped_str = ''
  for i in converted_str:
    if (i=='0'): flipped_str+='1'
    else: flipped_str+='0'
  return bitarray(flipped_str)

def encode_DC_entropy_all(dpcm_arr):
  size_bitarray = bitarray()
  value_bitarray = bitarray()

  for n in dpcm_arr:
    cur_size, cur_value_bitarray = decompose_int_to_size_value(n) # cur_size:n所占的比特位数 cur_value_bitarray:二进制数组序列
    cur_size_bitarray = huffman_table_DC[hex(cur_size)[2:]] # hex先转换为16进制后再把开头的0x去掉,由DC哈夫曼表得到cur_size的编码
    size_bitarray += cur_size_bitarray
    value_bitarray += cur_value_bitarray
  
  return size_bitarray, value_bitarray

def decode_DC_entropy_all(size_bitarray,value_bitarray):
  size_decoded_list = size_bitarray.decode(huffman_table_DC) # 将二进制序列的数组 根据DC哈夫曼表 还原为16进制的数组
  DC_numbers = len(size_decoded_list) # DC分量数组的长度
  dpcm_arr_restored = np.empty(DC_numbers, dtype=int)
  
  index = 0
  for str_cur_size in size_decoded_list:
    cur_size = int(str_cur_size,16)
    restored_val = compose_size_value_to_int(cur_size,value_bitarray) #根据size和bitarray还原成int值,这里的value_bitarray包含此处的值和后面的所有制,函数里面做了截取
    dpcm_arr_restored[index] = restored_val
    value_bitarray = value_bitarray[cur_size:] # 除掉已经运算之后的
    index += 1

  return dpcm_arr_restored


def RLE(ac):
  rle_list = []
  consecutive_0s = 0 # 连续0的数量
  
  for ac_coefficient in ac:
    if( ac_coefficient == 0 ):
      consecutive_0s += 1
    else:
      rle_list.append( (consecutive_0s,ac_coefficient) )
      consecutive_0s = 0 # bug 给错了
    # 不要append(0,0) 直接把他放入huffman_ac表里面
  
  return rle_list


def RLE_decode(rle):
  ac_restored_lst = []
 
  for consecutive_0s,number in rle:
    ac_restored_lst.extend([0]*consecutive_0s)
    ac_restored_lst.append(number)
  
  ac_restored_lst.extend([0]*(63-len(ac_restored_lst))) #末尾补上0
  ac_restored_array = np.array(ac_restored_lst)
  
  return ac_restored_array


def decompose_RLE_list_to_huffmanResBitarray_valueBitarray(rle_list):
  huffman_res_bitarray = bitarray()
  value_bitarray = bitarray()

  for item in rle_list:
    consecutive_0s = item[0] # 0的个数
    (cur_size, cur_value_bitarray) = decompose_int_to_size_value(item[1])
    cur_huffman_key_tuple = (consecutive_0s,cur_size) # key值 0的个数和当前值的二进制比特数构成tuple
    cur_huffman_res_bitarray = huffman_table_AC[cur_huffman_key_tuple]
    huffman_res_bitarray += cur_huffman_res_bitarray
    value_bitarray += cur_value_bitarray # 之后根据这个确定取几个bit
  
  huffman_res_bitarray += bitarray('1010') # 表示block编码的结束

  return huffman_res_bitarray, value_bitarray


def All_block_huffmanResbitarray_valueBitarray_to_RLE_lists(all_block_huffman_res_bitarray,all_block_value_bitarray):
  all_block_rle_lists = []
  value_bitarray_left = all_block_value_bitarray
  cur_block_index = 0
  cur_block_RLE_lst = []
  AC_huffman_decode_res = all_block_huffman_res_bitarray.decode(huffman_table_AC) #得到形如 [(0,1),(2,0),...]的表达形式
  
  for cur_consecutive_0s,cur_size in AC_huffman_decode_res:
    if((cur_consecutive_0s,cur_size) == (0,0)): # reaching end of 1 8x8 block
      all_block_rle_lists.append(cur_block_RLE_lst)
      cur_block_index += 1
      cur_block_RLE_lst = []
      continue
    restored_val = compose_size_value_to_int(cur_size,value_bitarray_left)
    cur_block_RLE_lst.append([cur_consecutive_0s,restored_val])
    value_bitarray_left = value_bitarray_left[cur_size:]
    
  return all_block_rle_lists


from PIL import Image
import numpy as np
from DCT_quant import *
from zigzag import *
import math

def martix_padding(npmat):
  rows = npmat.shape[0] # 获取行数
  rows_pad = math.ceil( rows/8 ) * 8

  cols = npmat.shape[1] # 获取列数
  cols_pad = math.ceil( cols/8 ) * 8


  mat_append_col = np.broadcast_to( npmat[:,-1][:,None], (npmat.shape[0],cols_pad - npmat.shape[1]) ) # 要填充的元素
  mat_padded_col = np.hstack( (npmat,mat_append_col) ) # 此时 矩阵的列数满足8的倍数

  mat_append_row = np.broadcast_to(mat_padded_col[-1,:],(rows_pad - mat_padded_col.shape[0],cols_pad)) # 要填充的元素
  mat_padded_row = np.vstack((mat_padded_col,mat_append_row)) # 此时 矩阵的行数也满足8的倍数

  mat_pad = mat_padded_row

  return mat_pad

def restore_img_from_padding(Y_padded,Cb_padded,Cr_padded,rows_origin,cols_origin):
  
  Cb_restored = np.repeat((np.repeat(Cb_padded,2,axis=1)),2,axis=0)  ## 第一个axis有问题
  Cr_restored = np.repeat((np.repeat(Cr_padded,2,axis=1)),2,axis=0)
  npmat_restored = np.empty( [rows_origin,cols_origin,3], dtype=np.uint8)
  npmat_restored[:,:,0] = Y_padded[   :rows_origin, :cols_origin]
  npmat_restored[:,:,1] = Cb_restored[:rows_origin, :cols_origin]
  npmat_restored[:,:,2] = Cr_restored[:rows_origin, :cols_origin]

  return npmat_restored

from pickletools import uint8
import numpy as np


def zigzag_sequence_generator(n):
  '''
    生成zigzag编码序列,返回的generator的item形如[2,3] \n
    具体走的方向、判断0和7看zigzag的图
  '''
  point = np.array([0,0])
  MOVING_RIGHT,MOVING_DOWN_LEFT,MOVING_DOWN,MOVING_UP_RIGHT = range(4) #4种状态
  state = MOVING_RIGHT

  for i in range(n*n):
    yield (i, point) # 每一个项是 第几个 和 点的值
    if( state == MOVING_RIGHT ):
      point += [0,1]
      if( point[0] == 0 ):
        state = MOVING_DOWN_LEFT
      elif( point[0] == 7 ):
        state = MOVING_UP_RIGHT
  
    elif( state == MOVING_DOWN_LEFT ):
      point += [1,-1]
      if( point[0] == 7 ):
        state = MOVING_RIGHT
      elif( point[1] == 0 ):
        state = MOVING_DOWN
    
    elif( state == MOVING_DOWN ):
      point += [1,0]
      if( point[1] == 0 ):
        state = MOVING_UP_RIGHT
      elif( point[1] == 7 ):
        state = MOVING_DOWN_LEFT
    
    elif( state == MOVING_UP_RIGHT ):
      point += [-1,1]
      if( point[1] == 7 ):
        state = MOVING_DOWN  
      elif( point[0] ==0 ):
        state = MOVING_RIGHT


n = 8
zigzag_sequence_array = np.empty( (n*n, 2), dtype=int)

# 生成zigzag数组 item项形如[2,3]
for (i,point) in zigzag_sequence_generator(n):
  zigzag_sequence_array[i] = point


def zigzag_block_to_array(block_quant_known):
  '''
    将量化之后的8*8矩阵 根据zigzag编码 转换为一维数组
  '''
  array = np.empty(n*n, dtype=int)
  for i,point in enumerate(zigzag_sequence_array): #enumerate 数组转化为对象
    array[i] = block_quant_known[tuple(point)]
  return array

def zigzag_array_to_block(array_known):
  '''
    将zigzag编码后的一维数组 转化为 8*8的矩阵块
  '''
  block = np.empty((n,n), dtype=int)
  for i,point in enumerate(zigzag_sequence_array):
    block[tuple(point)] = array_known[i]
  return block

import argparse
import numpy as np

from bitarray         import bitarray
from bitarray.util    import ba2int
from PIL              import Image
from DC_AC_extract    import dct_quant_and_extract_DC_AC_from_padded_matrix
from entropy_encoding import DPCM, RLE, decompose_RLE_list_to_huffmanResBitarray_valueBitarray, encode_DC_entropy_all,decode_DC_entropy_all
from padding_image    import *

def main():
  # 输入格式 python encoder.py 1.jpg 123
  parser = argparse.ArgumentParser()
  parser.add_argument("image_to_compress", help="path to the input image")
  parser.add_argument("compressed_file", help="path to the output compressed file")
  args = parser.parse_args()
  input_image_path  = args.image_to_compress
  output_image_path = args.compressed_file
  image_to_compress = Image.open(input_image_path) # 读入图片

  # 1. 图片由RGB转换为YCbCr
  ycbcr = image_to_compress.convert('YCbCr') # RGB转换为YCbCr
  npmat = np.array(ycbcr, dtype=int) - 128 # 归一化处理,每一个分量的范围-128~127, npmat:width * height * 3
  # print(npmat)
  rows,cols = npmat.shape[0],npmat.shape[1] # 像素的行数和列数

  # 2. 色度缩减取样 subsampling(4:2:0) + padding
  '''
    这个步骤中,需要进行2*2的矩阵分块,
    之后矩阵大小变为 rows/2 和 cols/2,
    而dct中需要切割8*8的矩阵块,则 rows/2和cols/2需要是8的倍数
    故:rows和cols是16的倍数

    方法1:直接把原图像的rows和cols变为16的倍数
    方法2:Y,Cb,Cr分离,对于Y只需要8的倍数,而Cb和Cr先采样,再变为8的倍数
      demo:7*7大小的矩阵,采样后变为4*4,然后变为8*8;
  '''
  y  = npmat[:,:,0] # 取每一个元素的第三个分量 y:width * height * 1,YCbCr中的Y分量
  Cb = npmat[::2,::2,1] # 行数和列数步长为2
  Cr = npmat[::2,::2,2] # 行数和列数步长为2
  
  Y_padded  = martix_padding(y)  # 将矩阵补充成8的倍数
  Cb_padded = martix_padding(Cb) # 将矩阵补充成8的倍数
  Cr_padded = martix_padding(Cr) # 将矩阵补充成8的倍数

  # 3. dct变换 + quant量化  + dc/ac提取

  # 提取dc和ac系数
  dc_y,ac_arrays_y   = dct_quant_and_extract_DC_AC_from_padded_matrix(Y_padded,  'lum')
  dc_cb,ac_arrays_cb = dct_quant_and_extract_DC_AC_from_padded_matrix(Cb_padded, 'chrom')
  dc_cr,ac_arrays_cr = dct_quant_and_extract_DC_AC_from_padded_matrix(Cr_padded, 'chrom')

  # dpcm + dc的熵编码
  # 差分编码
  dpcm_y  = DPCM(dc_y)
  dpcm_cb = DPCM(dc_cb)
  dpcm_cr = DPCM(dc_cr)

  # dc差分编码
  size_bitarray_dc_y, value_bitarray_dc_y = encode_DC_entropy_all(dpcm_y)
  size_bitarray_dc_cb, value_bitarray_dc_cb = encode_DC_entropy_all(dpcm_cb)
  size_bitarray_dc_cr, value_bitarray_dc_cr = encode_DC_entropy_all(dpcm_cr)

  # 对于每一个8*8矩阵块的ac系数进行RLE行程编码,然后通过比特编码联合在一起
  huffman_res_bitarray_ac_y = bitarray()
  value_res_bitarray_ac_y = bitarray()
  for ac_y in ac_arrays_y:
    tmp_RLE_res = RLE(ac_y)
    huffman_tmp_bitarray_ac_y, value_tmp_bitarray_ac_y = decompose_RLE_list_to_huffmanResBitarray_valueBitarray(tmp_RLE_res)
    huffman_res_bitarray_ac_y += huffman_tmp_bitarray_ac_y
    value_res_bitarray_ac_y   += value_tmp_bitarray_ac_y
  
  huffman_res_bitarray_ac_cb = bitarray()
  value_res_bitarray_ac_cb   = bitarray()
  for ac_cb in ac_arrays_cb:
    tmp_RLE_res = RLE(ac_cb)
    huffman_tmp_bitarray_ac_cb, value_tmp_bitarray_ac_cb = decompose_RLE_list_to_huffmanResBitarray_valueBitarray(tmp_RLE_res)
    huffman_res_bitarray_ac_cb += huffman_tmp_bitarray_ac_cb
    value_res_bitarray_ac_cb   += value_tmp_bitarray_ac_cb
    
  huffman_res_bitarray_ac_cr = bitarray()
  value_res_bitarray_ac_cr   = bitarray()
  for ac_cr in ac_arrays_cr:
    tmp_RLE_res = RLE(ac_cr)
    huffman_tmp_bitarray_ac_cr, value_tmp_bitarray_ac_cr = decompose_RLE_list_to_huffmanResBitarray_valueBitarray(tmp_RLE_res)
    huffman_res_bitarray_ac_cr += huffman_tmp_bitarray_ac_cr
    value_res_bitarray_ac_cr += value_tmp_bitarray_ac_cr


  with open(output_image_path, 'wb') as outFile:
    bitarray_lst = [
      size_bitarray_dc_y,value_bitarray_dc_y,\
      size_bitarray_dc_cb,value_bitarray_dc_cb,\
      size_bitarray_dc_cr,value_bitarray_dc_cr,\
      huffman_res_bitarray_ac_y,value_res_bitarray_ac_y,\
      huffman_res_bitarray_ac_cb,value_res_bitarray_ac_cb,\
      huffman_res_bitarray_ac_cr,value_res_bitarray_ac_cr
    ]
    
    write_bitarray = bitarray()
    rows_barr = bitarray( format(rows, '#018b')[2:] )
    cols_barr = bitarray( format(cols, '#018b')[2:] ) 
    write_bitarray += rows_barr 
    write_bitarray += cols_barr
    print("行数:"+ format(rows, '#018b')[2:] + "  列数:" + format(cols, '#018b')[2:])

    for barr in bitarray_lst:
      cur_bit_len_barr = bitarray( format(len(barr), '#034b')[2:] )
      write_bitarray += cur_bit_len_barr

    for barr in bitarray_lst:
      write_bitarray += barr

    write_bitarray.tofile(outFile)


if __name__ == "__main__":
  main()
python compression jpeg
© www.soinside.com 2019 - 2024. All rights reserved.