Adding numeric type support
authorSvetlana Derevyanko <svetlo_nika@mail.ru>
Fri, 20 Aug 2021 10:04:10 +0000 (13:04 +0300)
committerYura Sokolov <funny.falcon@gmail.com>
Thu, 14 Oct 2021 08:05:22 +0000 (11:05 +0300)
Added numeric type support.
Move data decompression, detoasting and aligning into separated function
to avoid redundancy.

README.pg_filedump
decode.c
decode.h
pg_filedump.c

index 3654e5a05c364955a4b8cc4a17ee26051a6e69dc..d10bfb3330d174319e57b70444d1b9685e29e08e 100644 (file)
@@ -85,6 +85,7 @@ The following options are valid for heap and index files:
         * json
         * macaddr
         * name
+        * numeric
         * oid
         * real
         * serial
index 027459e909c2518d01a5333711077e2e4b1c0b10..ad5f1c64c057a5faecf12e574aeca2c4c6474ae2 100644 (file)
--- a/decode.c
+++ b/decode.c
@@ -29,7 +29,8 @@
 static int
 ReadStringFromToast(const char *buffer,
                unsigned int buff_size,
-               unsigned int* out_size);
+               unsigned int* out_size,
+               int (*parse_value)(const char *, int));
 
 /*
  * Utilities for manipulation of header information for compressed
@@ -104,6 +105,12 @@ decode_char(const char *buffer, unsigned int buff_size, unsigned int *out_size);
 static int
 decode_name(const char *buffer, unsigned int buff_size, unsigned int *out_size);
 
+static int
+decode_numeric(const char *buffer, unsigned int buff_size, unsigned int *out_size);
+
+static int
+extract_data(const char *buffer, unsigned int buff_size, unsigned int *out_size, int (*parse_value)(const char *, int));
+
 static int
 decode_ignore(const char *buffer, unsigned int buff_size, unsigned int *out_size);
 
@@ -181,6 +188,9 @@ static ParseCallbackTableItem callback_table[] =
        {
                "name", &decode_name
        },
+       {
+               "numeric", &decode_numeric
+       },
        {
                "char", &decode_char
        },
@@ -263,7 +273,7 @@ CopyAppend(const char *str)
  * Append given string to current COPY line and encode special symbols
  * like \r, \n, \t and \\.
  */
-static void
+static int
 CopyAppendEncode(const char *str, int orig_len)
 {
        /*
@@ -339,6 +349,7 @@ CopyAppendEncode(const char *str, int orig_len)
 
        tmp_buff[curr_offset] = '\0';
        CopyAppend(tmp_buff);
+       return 0;
 }
 
 /* CopyAppend version with format string support */
@@ -348,6 +359,144 @@ CopyAppendEncode(const char *str, int orig_len)
          CopyAppend(__copy_format_buff); \
   } while(0)
 
+/*
+ * Decode a numeric type and append the result to current COPY line
+ */
+static int
+CopyAppendNumeric(const char *buffer, int num_size)
+{
+       struct NumericData num;
+
+       num = *(struct NumericData *)buffer;
+       if (NUMERIC_IS_SPECIAL(&num))
+       {
+               if (NUMERIC_IS_NINF(&num))
+               {
+                       CopyAppend("-Infinity");
+                       return 0;
+               }
+               if (NUMERIC_IS_PINF(&num))
+               {
+                       CopyAppend("Infinity");
+                       return 0;
+               }
+               if (NUMERIC_IS_NAN(&num))
+               {
+                       CopyAppend("NaN");
+                       return 0;
+               }
+               return -2;
+       }
+       else
+       {
+               int                             sign;
+               int                             weight;
+               int                             dscale;
+               int                             ndigits;
+               int                             i;
+               char                       *str;
+               char                       *cp;
+               char                       *endcp;
+               int                             d;
+               bool                            putit;
+               NumericDigit            d1;
+               NumericDigit            dig;
+               NumericDigit       *digits;
+
+               sign = NUMERIC_SIGN(&num);
+               weight = NUMERIC_WEIGHT(&num);
+               dscale = NUMERIC_DSCALE(&num);
+
+               if (num_size == NUMERIC_HEADER_SIZE(&num))
+               {
+                       /* No digits - compressed zero. */
+                       CopyAppendFmt("%d", 0);
+                       return 0;
+               }
+               else
+               {
+                       ndigits = num_size / sizeof(NumericDigit);
+                       digits = (NumericDigit *)(buffer + NUMERIC_HEADER_SIZE(&num));
+                       i = (weight + 1) * DEC_DIGITS;
+                       if (i <= 0)
+                               i = 1;
+
+                       str = palloc(i + dscale + DEC_DIGITS + 2);
+                       cp = str;
+
+                       /*
+                        * Output a dash for negative values
+                        */
+                       if (sign == NUMERIC_NEG)
+                               *cp++ = '-';
+
+                       /*
+                        * Output all digits before the decimal point
+                        */
+                       if (weight < 0)
+                       {
+                               d = weight + 1;
+                               *cp++ = '0';
+                       }
+                       else
+                       {
+                               for (d = 0; d <= weight; d++)
+                               {
+                                       dig = (d < ndigits) ? digits[d] : 0;
+                                       /* In the first digit, suppress extra leading decimal zeroes */
+                                       putit = (d > 0);
+                                               d1 = dig / 1000;
+                                       dig -= d1 * 1000;
+                                       putit |= (d1 > 0);
+                                       if (putit)
+                                               *cp++ = d1 + '0';
+                                       d1 = dig / 100;
+                                       dig -= d1 * 100;
+                                       putit |= (d1 > 0);
+                                       if (putit)
+                                               *cp++ = d1 + '0';
+                                       d1 = dig / 10;
+                                       dig -= d1 * 10;
+                                       putit |= (d1 > 0);
+                                       if (putit)
+                                               *cp++ = d1 + '0';
+                                       *cp++ = dig + '0';
+                               }
+                       }
+
+                       /*
+                        * If requested, output a decimal point and all the digits that follow it.
+                        * We initially put out a multiple of DEC_DIGITS digits, then truncate if
+                        * needed.
+                        */
+                       if (dscale > 0)
+                       {
+                               *cp++ = '.';
+                               endcp = cp + dscale;
+                               for (i = 0; i < dscale; d++, i += DEC_DIGITS)
+                               {
+                                       dig = (d >= 0 && d < ndigits) ? digits[d] : 0;
+                                       d1 = dig / 1000;
+                                       dig -= d1 * 1000;
+                                       *cp++ = d1 + '0';
+                                       d1 = dig / 100;
+                                       dig -= d1 * 100;
+                                       *cp++ = d1 + '0';
+                                       d1 = dig / 10;
+                                       dig -= d1 * 10;
+                                       *cp++ = d1 + '0';
+                                       *cp++ = dig + '0';
+                               }
+                               cp = endcp;
+                       }
+                       *cp = '\0';
+                       CopyAppend(str);
+                       pfree(str);
+                       return 0;
+               }
+       }
+}
+
 /* Discard accumulated COPY line */
 static void
 CopyClear(void)
@@ -811,6 +960,16 @@ decode_name(const char *buffer, unsigned int buff_size, unsigned int *out_size)
        return 0;
 }
 
+/*
+ * Decode numeric type.
+ */
+static int
+decode_numeric(const char *buffer, unsigned int buff_size, unsigned int *out_size)
+{
+       int result = extract_data(buffer, buff_size, out_size, &CopyAppendNumeric);
+       return result;
+}
+
 /* Decode a char type */
 static int
 decode_char(const char *buffer, unsigned int buff_size, unsigned int *out_size)
@@ -834,8 +993,20 @@ decode_ignore(const char *buffer, unsigned int buff_size, unsigned int *out_size
 /* Decode char(N), varchar(N), text, json or xml types */
 static int
 decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size)
+{
+       int result = extract_data(buffer, buff_size, out_size, &CopyAppendEncode);
+       return result;
+}
+
+/*
+ * Align data, parse varlena header, detoast and decompress.
+ * Last parameters responds for actual parsing according to type.
+ */
+static int
+extract_data(const char *buffer, unsigned int buff_size, unsigned int *out_size, int (*parse_value)(const char *, int))
 {
        int                     padding = 0;
+       int                     result  = 0;
 
        /* Skip padding bytes. */
        while (*buffer == 0x00)
@@ -854,14 +1025,13 @@ decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size
                 * 00000001 1-byte length word, unaligned, TOAST pointer
                 */
                uint32          len = VARSIZE_EXTERNAL(buffer);
-               int                     result = 0;
 
                if (len > buff_size)
                        return -1;
 
                if (blockOptions & BLOCK_DECODE_TOAST)
                {
-                       result = ReadStringFromToast(buffer, buff_size, out_size);
+                       result = ReadStringFromToast(buffer, buff_size, out_size, parse_value);
                }
                else
                {
@@ -883,9 +1053,9 @@ decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size
                if (len > buff_size)
                        return -1;
 
-               CopyAppendEncode(buffer + 1, len - 1);
+               result = parse_value(buffer + 1, len - 1);
                *out_size = padding + len;
-               return 0;
+               return result;
        }
 
        if (VARATT_IS_4B_U(buffer) && buff_size >= 4)
@@ -898,9 +1068,9 @@ decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size
                if (len > buff_size)
                        return -1;
 
-               CopyAppendEncode(buffer + 4, len - 4);
+               result = parse_value(buffer + 4, len - 4);
                *out_size = padding + len;
-               return 0;
+               return result;
        }
 
        if (VARATT_IS_4B_C(buffer) && buff_size >= 8)
@@ -911,7 +1081,9 @@ decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size
                int                                             decompress_ret;
                uint32                                  len = VARSIZE_4B(buffer);
                uint32                                  decompressed_len = 0;
+#if PG_VERSION_NUM >= 140000
                ToastCompressionId              cmid;
+#endif
 
 #if PG_VERSION_NUM >= 140000
                decompressed_len = VARDATA_COMPRESSED_GET_EXTSIZE(buffer);
@@ -934,31 +1106,32 @@ decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size
                        return 0;
                }
 
+#if PG_VERSION_NUM >= 140000
                cmid = VARDATA_COMPRESSED_GET_COMPRESS_METHOD(buffer);
                switch(cmid)
                {
                        case TOAST_PGLZ_COMPRESSION_ID:
                                decompress_ret = pglz_decompress(VARDATA_4B_C(buffer), len - 2 * sizeof(uint32),
-                                                                                                decompress_tmp_buff, decompressed_len
-#if PG_VERSION_NUM >= 120000
-                                                                                                , true
-#endif
-                                                                                                );
+                                                                                                decompress_tmp_buff, decompressed_len, true);
                                break;
-                       case TOAST_LZ4_COMPRESSION_ID:
 #ifdef USE_LZ4
+                       case TOAST_LZ4_COMPRESSION_ID:
                                decompress_ret = LZ4_decompress_safe(VARDATA_4B_C(buffer), decompress_tmp_buff,
                                                                                                         len - 2 * sizeof(uint32), decompressed_len);
                                break;
-#else
-                               printf("Error: compression method lz4 not supported.\n");
-                               printf("Try to rebuild pg_filedump for PostgreSQL server of version 14+ with --with-lz4 option.\n");
-                               return -2;
 #endif
                        default:
                                decompress_ret = -1;
                                break;
                }
+#else /* PG_VERSION_NUM < 140000 */
+               decompress_ret = pglz_decompress(VARDATA_4B_C(buffer), len - 2 * sizeof(uint32),
+                                                                                decompress_tmp_buff, decompressed_len
+#if PG_VERSION_NUM >= 120000
+                                                                                , true
+#endif
+                                                                                );
+#endif /* PG_VERSION_NUM >= 140000 */
 
                if ((decompress_ret != decompressed_len) || (decompress_ret < 0))
                {
@@ -968,9 +1141,9 @@ decode_string(const char *buffer, unsigned int buff_size, unsigned int *out_size
                        return 0;
                }
 
-               CopyAppendEncode(decompress_tmp_buff, decompressed_len);
+               result = parse_value(decompress_tmp_buff, decompressed_len);
                *out_size = padding + len;
-               return 0;
+               return result;
        }
 
        return -9;
@@ -1033,7 +1206,7 @@ FormatDecode(const char *tupleData, unsigned int tupleSize)
        CopyFlush();
 }
 
-static int DumpCompressedString(const char *data, int32 compressed_size)
+static int DumpCompressedString(const char *data, int32 compressed_size, int (*parse_value)(const char *, int))
 {
        int                                             decompress_ret;
        char                               *decompress_tmp_buff = malloc(TOAST_COMPRESS_RAWSIZE(data));
@@ -1087,7 +1260,8 @@ static int DumpCompressedString(const char *data, int32 compressed_size)
 static int
 ReadStringFromToast(const char *buffer,
                unsigned int buff_size,
-               unsigned int* out_size)
+               unsigned int* out_size,
+               int (*parse_value)(const char *, int))
 {
        int             result = 0;
 
@@ -1160,9 +1334,9 @@ ReadStringFromToast(const char *buffer,
                        if (result == 0)
                        {
                                if (VARATT_EXTERNAL_IS_COMPRESSED(toast_ptr))
-                                       result = DumpCompressedString(toast_data, toast_ext_size);
+                                       result = DumpCompressedString(toast_data, toast_ext_size, parse_value);
                                else
-                                       CopyAppendEncode(toast_data, toast_ext_size);
+                                       result = parse_value(toast_data, toast_ext_size);
                        }
                        else
                        {
index 4d151a476024d4b2aa97b4823d7760d1ef9a2533..24ba2e6a9538081d5aa900e539e36082fd145d80 100644 (file)
--- a/decode.h
+++ b/decode.h
@@ -1,6 +1,14 @@
 #ifndef _PG_FILEDUMP_DECODE_H_
 #define _PG_FILEDUMP_DECODE_H_
 
+#define NBASE          10000
+#define HALF_NBASE     5000
+#define DEC_DIGITS     4                       /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS       2       /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS       4
+
+typedef int16 NumericDigit;
+
 int
 ParseAttributeTypesString(const char *str);
 
@@ -15,4 +23,122 @@ ToastChunkDecode(const char* tuple_data,
                char *chunk_data,
                unsigned int *chunk_data_size);
 
+struct NumericShort
+{
+       uint16          n_header;               /* Sign + display scale + weight */
+       NumericDigit n_data[FLEXIBLE_ARRAY_MEMBER]; /* Digits */
+};
+
+struct NumericLong
+{
+       uint16          n_sign_dscale;  /* Sign + display scale */
+       int16           n_weight;               /* Weight of 1st digit  */
+       NumericDigit n_data[FLEXIBLE_ARRAY_MEMBER]; /* Digits */
+};
+
+union NumericChoice
+{
+       uint16          n_header;               /* Header word */
+       struct NumericLong n_long;      /* Long form (4-byte header) */
+       struct NumericShort n_short;    /* Short form (2-byte header) */
+};
+
+struct NumericData
+{
+       union NumericChoice choice; /* choice of format */
+};
+
+/*
+ * Interpretation of high bits.
+ */
+
+#define NUMERIC_SIGN_MASK      0xC000
+#define NUMERIC_POS                    0x0000
+#define NUMERIC_NEG                    0x4000
+#define NUMERIC_SHORT          0x8000
+#define NUMERIC_SPECIAL                0xC000
+
+#define NUMERIC_FLAGBITS(n) ((n)->choice.n_header & NUMERIC_SIGN_MASK)
+#define NUMERIC_IS_SHORT(n)            (NUMERIC_FLAGBITS(n) == NUMERIC_SHORT)
+#define NUMERIC_IS_SPECIAL(n)  (NUMERIC_FLAGBITS(n) == NUMERIC_SPECIAL)
+
+#define NUMERIC_HDRSZ  (VARHDRSZ + sizeof(uint16) + sizeof(int16))
+#define NUMERIC_HDRSZ_SHORT (VARHDRSZ + sizeof(uint16))
+
+/*
+ * If the flag bits are NUMERIC_SHORT or NUMERIC_SPECIAL, we want the short
+ * header; otherwise, we want the long one.  Instead of testing against each
+ * value, we can just look at the high bit, for a slight efficiency gain.
+ */
+#define NUMERIC_HEADER_IS_SHORT(n)     (((n)->choice.n_header & 0x8000) != 0)
+#define NUMERIC_HEADER_SIZE(n) \
+       (sizeof(uint16) + \
+        (NUMERIC_HEADER_IS_SHORT(n) ? 0 : sizeof(int16)))
+
+/*
+ * Definitions for special values (NaN, positive infinity, negative infinity).
+ *
+ * The two bits after the NUMERIC_SPECIAL bits are 00 for NaN, 01 for positive
+ * infinity, 11 for negative infinity.  (This makes the sign bit match where
+ * it is in a short-format value, though we make no use of that at present.)
+ * We could mask off the remaining bits before testing the active bits, but
+ * currently those bits must be zeroes, so masking would just add cycles.
+ */
+#define NUMERIC_EXT_SIGN_MASK  0xF000  /* high bits plus NaN/Inf flag bits */
+#define NUMERIC_NAN                            0xC000
+#define NUMERIC_PINF                   0xD000
+#define NUMERIC_NINF                   0xF000
+#define NUMERIC_INF_SIGN_MASK  0x2000
+
+#define NUMERIC_EXT_FLAGBITS(n)        ((n)->choice.n_header & NUMERIC_EXT_SIGN_MASK)
+#define NUMERIC_IS_NAN(n)              ((n)->choice.n_header == NUMERIC_NAN)
+#define NUMERIC_IS_PINF(n)             ((n)->choice.n_header == NUMERIC_PINF)
+#define NUMERIC_IS_NINF(n)             ((n)->choice.n_header == NUMERIC_NINF)
+#define NUMERIC_IS_INF(n) \
+       (((n)->choice.n_header & ~NUMERIC_INF_SIGN_MASK) == NUMERIC_PINF)
+
+/*
+ * Short format definitions.
+ */
+
+#define NUMERIC_SHORT_SIGN_MASK                        0x2000
+#define NUMERIC_SHORT_DSCALE_MASK              0x1F80
+#define NUMERIC_SHORT_DSCALE_SHIFT             7
+#define NUMERIC_SHORT_DSCALE_MAX               \
+       (NUMERIC_SHORT_DSCALE_MASK >> NUMERIC_SHORT_DSCALE_SHIFT)
+#define NUMERIC_SHORT_WEIGHT_SIGN_MASK 0x0040
+#define NUMERIC_SHORT_WEIGHT_MASK              0x003F
+#define NUMERIC_SHORT_WEIGHT_MAX               NUMERIC_SHORT_WEIGHT_MASK
+#define NUMERIC_SHORT_WEIGHT_MIN               (-(NUMERIC_SHORT_WEIGHT_MASK+1))
+
+/*
+ * Extract sign, display scale, weight.  These macros extract field values
+ * suitable for the NumericVar format from the Numeric (on-disk) format.
+ *
+ * Note that we don't trouble to ensure that dscale and weight read as zero
+ * for an infinity; however, that doesn't matter since we never convert
+ * "special" numerics to NumericVar form.  Only the constants defined below
+ * (const_nan, etc) ever represent a non-finite value as a NumericVar.
+ */
+
+#define NUMERIC_DSCALE_MASK                    0x3FFF
+#define NUMERIC_DSCALE_MAX                     NUMERIC_DSCALE_MASK
+
+#define NUMERIC_SIGN(n) \
+       (NUMERIC_IS_SHORT(n) ? \
+               (((n)->choice.n_short.n_header & NUMERIC_SHORT_SIGN_MASK) ? \
+                NUMERIC_NEG : NUMERIC_POS) : \
+               (NUMERIC_IS_SPECIAL(n) ? \
+                NUMERIC_EXT_FLAGBITS(n) : NUMERIC_FLAGBITS(n)))
+#define NUMERIC_DSCALE(n)      (NUMERIC_HEADER_IS_SHORT((n)) ? \
+       ((n)->choice.n_short.n_header & NUMERIC_SHORT_DSCALE_MASK) \
+               >> NUMERIC_SHORT_DSCALE_SHIFT \
+       : ((n)->choice.n_long.n_sign_dscale & NUMERIC_DSCALE_MASK))
+#define NUMERIC_WEIGHT(n)      (NUMERIC_HEADER_IS_SHORT((n)) ? \
+       (((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_SIGN_MASK ? \
+               ~NUMERIC_SHORT_WEIGHT_MASK : 0) \
+        | ((n)->choice.n_short.n_header & NUMERIC_SHORT_WEIGHT_MASK)) \
+       : ((n)->choice.n_long.n_weight))
+
+
 #endif
index ba43550679f25ab790d914f66f0c57943b8cc075..bdd9266627a0f840d38d4554ca44d10699c57131 100644 (file)
@@ -177,7 +177,7 @@ DisplayOptions(unsigned int validOptions)
                 "  -D  Decode tuples using given comma separated list of types\n"
                 "      Supported types:\n"
                 "        bigint bigserial bool char charN date float float4 float8 int\n"
-                "        json macaddr name oid real serial smallint smallserial text\n"
+                "        json macaddr name numeric oid real serial smallint smallserial text\n"
                 "        time timestamp timetz uuid varchar varcharN xid xml\n"
                 "      ~ ignores all attributes left in a tuple\n"
                 "  -f  Display formatted block content dump along with interpretation\n"