source: trunk/module-stat.c @ 5375

Last change on this file since 5375 was 5370, checked in by Admin, 8 years ago

Cleanup memory in pipe on client exit. Fix some possible segfault sources. Fix wrong display of au parameter on WebIf status page if config includes an empty au line.

File size: 28.5 KB
Line 
1#include "globals.h"
2
3#ifdef WITH_LB
4#include "module-cccam.h"
5
6#define UNDEF_AVG_TIME 80000
7#define MAX_ECM_SEND_CACHE 16
8
9#define LB_REOPEN_MODE_STANDARD 0
10#define LB_REOPEN_MODE_FAST 1
11
12#define LB_NONE 0
13#define LB_FASTEST_READER_FIRST 1
14#define LB_OLDEST_READER_FIRST 2
15#define LB_LOWEST_USAGELEVEL 3
16#define LB_LOG_ONLY 10
17
18static int32_t stat_load_save;
19static struct timeb nulltime;
20static time_t last_housekeeping = 0;
21
22void init_stat()
23{
24    cs_ftime(&nulltime);
25    stat_load_save = -100;
26
27    //checking config
28    if (cfg.lb_nbest_readers < 2)
29        cfg.lb_nbest_readers = DEFAULT_NBEST;
30    if (cfg.lb_nfb_readers < 2)
31        cfg.lb_nfb_readers = DEFAULT_NFB;
32    if (cfg.lb_min_ecmcount < 2)
33        cfg.lb_min_ecmcount = DEFAULT_MIN_ECM_COUNT;
34    if (cfg.lb_max_ecmcount < 3)
35        cfg.lb_max_ecmcount = DEFAULT_MAX_ECM_COUNT;
36    if (cfg.lb_reopen_seconds < 10)
37        cfg.lb_reopen_seconds = DEFAULT_REOPEN_SECONDS;
38    if (cfg.lb_retrylimit <= 0)
39        cfg.lb_retrylimit = DEFAULT_RETRYLIMIT;
40    if (cfg.lb_stat_cleanup <= 0)
41        cfg.lb_stat_cleanup = DEFAULT_LB_STAT_CLEANUP;
42}
43
44#define LINESIZE 1024
45
46void load_stat_from_file()
47{
48    stat_load_save = 0;
49    char buf[256];
50    char *line;
51    char *fname;
52    FILE *file;
53    if (!cfg.lb_savepath || !cfg.lb_savepath[0]) {
54        snprintf(buf, sizeof(buf), "%s/stat", get_tmp_dir());
55        fname = buf;
56    }
57    else
58        fname = cfg.lb_savepath;
59       
60    file = fopen(fname, "r");
61       
62    if (!file) {
63        cs_log("loadbalancer: can't read from file %s", fname);
64        return;
65    }
66    cs_debug_mask(D_TRACE, "loadbalancer: load statistics from %s", fname);
67
68    struct timeb ts, te;
69    cs_ftime(&ts);
70           
71    struct s_reader *rdr = NULL;
72    READER_STAT *stat, *dup=NULL;
73    line = cs_malloc(&line, LINESIZE, 0);
74       
75    int32_t i=1;
76    int32_t valid=0;
77    int32_t count=0;
78    int32_t type=0;
79    char *ptr, *saveptr1 = NULL;
80    char *split[10];
81   
82    while (fgets(line, LINESIZE, file))
83    {
84        if (!line[0] || line[0] == '#' || line[0] == ';')
85            continue;
86       
87        if(!cs_malloc(&stat,sizeof(READER_STAT), -1)) continue;
88
89        //get type by evaluating first line:
90        if (type==0) {
91            if (strstr(line, " rc ")) type = 2;
92            else type = 1;
93        }   
94       
95        if (type==1) { //New format - faster parsing:
96            for (i = 0, ptr = strtok_r(line, ",", &saveptr1); ptr && i<10 ; ptr = strtok_r(NULL, ",", &saveptr1), i++)
97                split[i] = ptr;
98            valid = (i==10);
99            if (valid) {
100                strncpy(buf, split[0], sizeof(buf)-1);
101                stat->rc = atoi(split[1]);
102                stat->caid = a2i(split[2], 4);
103                stat->prid = a2i(split[3], 6);
104                stat->srvid = a2i(split[4], 4);
105                stat->time_avg = atoi(split[5]);
106                stat->ecm_count = atoi(split[6]);
107                stat->last_received = atol(split[7]);
108                stat->fail_factor = atoi(split[8]);
109                stat->ecmlen = a2i(split[9], 2);
110            }
111        } else { //Old format - keep for compatibility:
112            i = sscanf(line, "%s rc %d caid %04hX prid %06X srvid %04hX time avg %dms ecms %d last %ld fail %d len %02hX\n",
113                buf, &stat->rc, &stat->caid, &stat->prid, &stat->srvid, 
114                &stat->time_avg, &stat->ecm_count, &stat->last_received, &stat->fail_factor, &stat->ecmlen);
115            valid = i>5;
116        }
117       
118        if (valid) {
119            if (rdr == NULL || strcmp(buf, rdr->label) != 0) {
120                LL_ITER itr = ll_iter_create(configured_readers);
121                while ((rdr=ll_iter_next(&itr))) {
122                    if (strcmp(rdr->label, buf) == 0) {
123                        break;
124                    }
125                }
126            }
127           
128            if (rdr != NULL && strcmp(buf, rdr->label) == 0) {
129                if (!rdr->lb_stat)
130                    rdr->lb_stat = ll_create();
131                   
132                //Duplicate check:
133                if (cs_dblevel == 0xFF) //Only with full debug for faster reading...
134                    dup = get_stat(rdr, stat->caid, stat->prid, stat->srvid, stat->ecmlen);
135                   
136                if (dup)
137                    free(stat); //already loaded
138                else {
139                    ll_append(rdr->lb_stat, stat);
140                    count++;
141                }
142            }
143            else 
144            {
145                cs_log("loadbalancer: statistics could not be loaded for %s", buf);
146                free(stat);
147            }
148        }
149        else 
150        {
151            cs_debug_mask(D_TRACE, "loadbalancer: statistics ERROR: %s rc=%d i=%d", buf, stat->rc, i);
152            free(stat);
153        }
154    } 
155    fclose(file);
156    free(line);
157
158    cs_ftime(&te);
159#ifdef WITH_DEBUG
160    int32_t time = 1000*(te.time-ts.time)+te.millitm-ts.millitm;
161
162    cs_debug_mask(D_TRACE, "loadbalancer: statistics loaded %d records in %dms", count, time);
163#endif
164}
165
166static uint32_t get_prid(uint16_t caid, uint32_t prid)
167{
168    int32_t i;
169    for (i=0;i<CS_MAXCAIDTAB;i++) {
170        uint16_t tcaid = cfg.lb_noproviderforcaid.caid[i];
171        if (!tcaid) break;
172        if (tcaid == caid) {
173            prid = 0;
174            break;
175        }
176        if (tcaid < 0x0100 && (caid >> 8) == tcaid) {
177            prid = 0;
178            break;
179        }
180       
181    }
182    return prid;
183}
184
185/**
186 * get statistic values for reader ridx and caid/prid/srvid/ecmlen
187 */
188READER_STAT *get_stat(struct s_reader *rdr, uint16_t caid, uint32_t prid, uint16_t srvid, int16_t ecmlen)
189{
190    if (!rdr->lb_stat)
191        rdr->lb_stat = ll_create();
192
193    prid = get_prid(caid, prid);
194   
195    LL_ITER it = ll_iter_create(rdr->lb_stat);
196    READER_STAT *stat = NULL;
197    int32_t i = 0;
198    while ((stat = ll_iter_next(&it))) {
199        i++;
200        if (stat->caid==caid && stat->prid==prid && stat->srvid==srvid) {
201            if (stat->ecmlen == ecmlen)
202                break;
203            if (!stat->ecmlen) {
204                stat->ecmlen = ecmlen;
205                break;
206            }
207        }
208    }
209   
210    //Move stat to list start for faster access:
211    if (i > 10 && stat)
212        ll_iter_move_first(&it);
213   
214    return stat;
215}
216
217/**
218 * removes caid/prid/srvid/ecmlen from stat-list of reader ridx
219 */
220int32_t remove_stat(struct s_reader *rdr, uint16_t caid, uint32_t prid, uint16_t srvid, int16_t ecmlen)
221{
222    if (!rdr->lb_stat)
223        return 0;
224
225    int32_t c = 0;
226    LL_ITER it = ll_iter_create(rdr->lb_stat);
227    READER_STAT *stat;
228    while ((stat = ll_iter_next(&it))) {
229        if (stat->caid==caid && stat->prid==prid && stat->srvid==srvid) {
230            if (!stat->ecmlen || stat->ecmlen == ecmlen) {
231                ll_iter_remove_data(&it);
232                c++;
233            }
234        }
235    }
236    return c;
237}
238
239/**
240 * Calculates average time
241 */
242void calc_stat(READER_STAT *stat)
243{
244    int32_t i, c=0, t = 0;
245    for (i = 0; i < LB_MAX_STAT_TIME; i++) {
246        if (stat->time_stat[i] > 0) {
247            t += (int32_t)stat->time_stat[i];
248            c++;
249        }
250    }
251    if (!c)
252        stat->time_avg = UNDEF_AVG_TIME;
253    else
254        stat->time_avg = t / c;
255}
256
257/**
258 * Saves statistik to /tmp/.oscam/stat.n where n is reader-index
259 */
260void save_stat_to_file_thread()
261{
262    stat_load_save = 0;
263    char buf[256];
264    char *fname;
265    if (!cfg.lb_savepath || !cfg.lb_savepath[0]) {
266        snprintf(buf, sizeof(buf), "%s/stat", get_tmp_dir());
267        fname = buf;
268    }
269    else
270        fname = cfg.lb_savepath;
271       
272    FILE *file = fopen(fname, "w");
273   
274    if (!file) {
275        cs_log("can't write to file %s", fname);
276        return;
277    }
278   
279    struct timeb ts, te;
280    cs_ftime(&ts);
281         
282    time_t cleanup_time = time(NULL) - (cfg.lb_stat_cleanup*60*60);
283           
284    int32_t count=0;
285    struct s_reader *rdr;
286    LL_ITER itr = ll_iter_create(configured_readers);
287    while ((rdr=ll_iter_next(&itr))) {
288       
289        if (rdr->lb_stat) {
290            LL_ITER it = ll_iter_create(rdr->lb_stat);
291            READER_STAT *stat;
292            while ((stat = ll_iter_next(&it))) {
293           
294                if (stat->last_received < cleanup_time) { //cleanup old stats
295                    ll_iter_remove_data(&it);
296                    continue;
297                }
298               
299                //Old version, too slow to parse:
300                //fprintf(file, "%s rc %d caid %04hX prid %06X srvid %04hX time avg %dms ecms %d last %ld fail %d len %02hX\n",
301                //  rdr->label, stat->rc, stat->caid, stat->prid,
302                //  stat->srvid, stat->time_avg, stat->ecm_count, stat->last_received, stat->fail_factor, stat->ecmlen);
303               
304                //New version:
305                fprintf(file, "%s,%d,%04hX,%06X,%04hX,%d,%d,%ld,%d,%02hX\n",
306                    rdr->label, stat->rc, stat->caid, stat->prid, 
307                    stat->srvid, stat->time_avg, stat->ecm_count, stat->last_received, stat->fail_factor, stat->ecmlen);
308                count++;
309            }
310        }
311    }
312   
313    fclose(file);
314
315    cs_ftime(&te);
316    int32_t time = 1000*(te.time-ts.time)+te.millitm-ts.millitm;
317
318
319    cs_log("loadbalancer: statistic saved %d records to %s in %dms", count, fname, time);
320}
321
322void save_stat_to_file(int32_t thread)
323{
324    stat_load_save = 0;
325    if (thread)
326        start_thread((void*)&save_stat_to_file_thread, "save lb stats");
327    else
328        save_stat_to_file_thread();
329}
330
331/**
332 * fail_factor is multiplied to the reopen_time. This function increases the fail_factor
333 **/
334void inc_fail(READER_STAT *stat)
335{
336    if (stat->fail_factor <= 0)
337        stat->fail_factor = 1;
338    else
339        stat->fail_factor *= 2;
340}
341
342READER_STAT *get_add_stat(struct s_reader *rdr, ECM_REQUEST *er, uint32_t prid)
343{
344    READER_STAT *stat = get_stat(rdr, er->caid, prid, er->srvid, er->l);
345    if (!stat) {
346        if(cs_malloc(&stat,sizeof(READER_STAT), -1)){
347            stat->caid = er->caid;
348            stat->prid = prid;
349            stat->srvid = er->srvid;
350            stat->ecmlen = er->l;
351            stat->time_avg = UNDEF_AVG_TIME; //dummy placeholder
352            ll_append(rdr->lb_stat, stat);
353        }
354    }
355
356    if (stat->ecm_count < 0)
357        stat->ecm_count=0;
358       
359    return stat;
360}
361
362/**
363 * Adds caid/prid/srvid/ecmlen to stat-list for reader ridx with time/rc
364 */
365void add_stat(struct s_reader *rdr, ECM_REQUEST *er, int32_t ecm_time, int32_t rc)
366{
367    if (!rdr || !er || !cfg.lb_mode)
368        return;
369    struct s_client *cl = rdr->client;
370    if (!cl)
371        return;
372       
373    uint32_t prid = get_prid(er->caid, er->prid);
374   
375    READER_STAT *stat;
376   
377    //inc ecm_count if found, drop to 0 if not found:
378    // rc codes:
379    // 0 = found       +
380    // 1 = cache1      #
381    // 2 = cache2      #
382    // 3 = emu         +
383    // 4 = not found   -
384    // 5 = timeout     -2
385    // 6 = sleeping    #
386    // 7 = fake        #
387    // 8 = invalid     #
388    // 9 = corrupt     #
389    // 10= no card     #
390    // 11= expdate     #
391    // 12= disabled    #
392    // 13= stopped     #
393    // 100= unhandled  #
394    //        + = adds statistic values
395    //        # = ignored because of duplicate values, temporary failures or softblocks
396    //        - = causes loadbalancer to block this reader for this caid/prov/sid
397    //        -2 = causes loadbalancer to block if happens too often
398   
399    if (rc == 4 && (uint32_t)ecm_time >= cfg.ctimeout) //Map "not found" to "timeout" if ecm_time>client time out
400        rc = 5;
401
402    time_t ctime = time(NULL);
403   
404    if (rc == 0) { //found
405        stat = get_add_stat(rdr, er, prid);
406        stat->rc = 0;
407        stat->ecm_count++;
408        stat->last_received = ctime;
409        stat->fail_factor = 0;
410       
411        //If answering reader is a fallback reader, decrement answer time by fallback timeout:
412        struct s_reader *r;
413        LL_ITER it = ll_iter_create(er->matching_rdr);
414        int8_t is_fallback = 0;
415        while ((r=ll_iter_next(&it))) {
416            if (it.cur == er->fallback) is_fallback = 1;
417            if (r == rdr) {
418                if (is_fallback && (uint32_t)ecm_time >= cfg.ftimeout)
419                    ecm_time -= cfg.ftimeout;
420                break;
421            }
422        }
423       
424        //FASTEST READER:
425        stat->time_idx++;
426        if (stat->time_idx >= LB_MAX_STAT_TIME)
427            stat->time_idx = 0;
428        stat->time_stat[stat->time_idx] = ecm_time;
429        calc_stat(stat);
430
431        //OLDEST READER now set by get best reader!
432       
433       
434        //USAGELEVEL:
435        int32_t ule = rdr->lb_usagelevel_ecmcount;
436        if (ule > 0 && ((ule / cfg.lb_min_ecmcount) > 0)) //update every MIN_ECM_COUNT usagelevel:
437        {
438            time_t t = (ctime-rdr->lb_usagelevel_time);
439            rdr->lb_usagelevel = 1000/(t<1?1:t);
440            ule = 0;
441        }
442        if (ule == 0)
443            rdr->lb_usagelevel_time = ctime;
444        rdr->lb_usagelevel_ecmcount = ule+1;
445    }
446    else if (rc == 1 || rc == 2) { //cache
447        //no increase of statistics here, cachetime is not real time
448        stat = get_add_stat(rdr, er, prid);
449        stat->last_received = ctime;
450    }
451    else if (rc == 4) { //not found
452        stat = get_add_stat(rdr, er, prid);
453        //CCcam card can't decode, 0x28=NOK1, 0x29=NOK2
454        //CCcam loop detection = E2_CCCAM_LOOP
455        if (er->rcEx == E2_CCCAM_NOK1 || er->rcEx == E2_CCCAM_NOK2 || er->rcEx == E2_CCCAM_LOOP) {
456            stat->last_received = ctime; //to avoid timeouts
457            return;
458        }
459           
460        stat->rc = rc;
461        inc_fail(stat);
462        stat->last_received = ctime;
463       
464        //reduce ecm_count step by step
465        if (!cfg.lb_reopen_mode)
466            stat->ecm_count /= 10;
467    }
468    else if (rc == 5) { //timeout
469        stat = get_add_stat(rdr, er, prid);
470       
471        //catch suddenly occuring timeouts and block reader:
472        if ((int)(ctime-stat->last_received) < (int)(5*cfg.ctimeout) && 
473                stat->rc == 0 && stat->ecm_count == 0) {
474            stat->rc = 5;
475                //inc_fail(stat); //do not inc fail factor in this case
476        }
477        //reader is longer than 5s connected && not more then 5 pending ecms:
478        else if ((cl->login+(int)(2*cfg.ctimeout/1000)) < ctime && cl->pending < 5 && 
479                stat->rc == 0 && stat->ecm_count == 0) {
480            stat->rc = 5;
481            inc_fail(stat);
482        }
483               
484        stat->last_received = ctime;
485
486        //add timeout to stat:
487        if (ecm_time<=0 || ecm_time > (int)cfg.ctimeout)
488            ecm_time = cfg.ctimeout;
489        stat->time_idx++;
490        if (stat->time_idx >= LB_MAX_STAT_TIME)
491            stat->time_idx = 0;
492        stat->time_stat[stat->time_idx] = ecm_time;
493        calc_stat(stat);
494    }
495    else
496    {
497        if (rc >= 0)
498            cs_debug_mask(D_TRACE, "loadbalancer: not handled stat for reader %s: rc %d %04hX&%06lX/%04hX/%02hX time %dms",
499                rdr->label, rc, er->caid, prid, er->srvid, er->l, ecm_time);
500   
501        return;
502    }
503   
504    housekeeping_stat(0);
505       
506    cs_debug_mask(D_TRACE, "loadbalancer: adding stat for reader %s: rc %d %04hX&%06lX/%04hX/%02hX time %dms fail %d",
507                rdr->label, rc, er->caid, prid, er->srvid, er->l, ecm_time, stat->fail_factor);
508   
509    if (cfg.lb_save) {
510        stat_load_save++;
511        if (stat_load_save > cfg.lb_save)
512            save_stat_to_file(1);   
513    }
514}
515
516void reset_stat(uint16_t caid, uint32_t prid, uint16_t srvid, int16_t ecmlen)
517{
518    //cs_debug_mask(D_TRACE, "loadbalance: resetting ecm count");
519    struct s_reader *rdr;
520    for (rdr=first_active_reader; rdr ; rdr=rdr->next) {
521        if (rdr->lb_stat && rdr->client) {
522            READER_STAT *stat = get_stat(rdr, caid, prid, srvid, ecmlen);
523            if (stat) {
524                if (stat->ecm_count > 0)
525                    stat->ecm_count = 1; //not zero, so we know it's decodeable
526                stat->rc = 0;
527                stat->fail_factor = 0;
528            }
529        }
530    }
531}
532
533int32_t has_ident(FTAB *ftab, ECM_REQUEST *er) {
534
535    if (!ftab || !ftab->filts)
536        return 0;
537       
538    int32_t j, k;
539
540    for (j = 0; j < ftab->nfilts; j++) {
541        if (ftab->filts[j].caid) {
542            if (ftab->filts[j].caid==er->caid) { //caid matches!
543                int32_t nprids = ftab->filts[j].nprids;
544                if (!nprids) // No Provider ->Ok
545                    return 1;
546
547                for (k = 0; k < nprids; k++) {
548                    uint32_t prid = ftab->filts[j].prids[k];
549                    if (prid == er->prid) { //Provider matches
550                        return 1;
551                    }
552                }
553            }
554        }
555    }
556    return 0; //No match!
557}
558
559struct stat_value {
560    struct s_reader *rdr;
561    int32_t value;
562    int32_t time;
563};
564
565static struct stat_value *crt_cur(struct s_reader *rdr, int32_t value, int32_t time) {
566    struct stat_value *v;
567    cs_malloc(&v,sizeof(struct stat_value), 1);
568    v->rdr = rdr;
569    v->value = value;
570    v->time = time;
571    return v;
572}
573
574#ifdef WITH_DEBUG
575static char *strend(char *c) {
576    while (c && *c) c++;
577    return c;
578}
579#endif
580
581static int32_t get_retrylimit(ECM_REQUEST *er) {
582        int32_t i;
583        for (i = 0; i < cfg.lb_retrylimittab.n; i++) {
584                if (cfg.lb_retrylimittab.caid[i] == er->caid)
585                        return cfg.lb_retrylimittab.value[i];
586        }
587        return cfg.lb_retrylimit;
588}
589
590static int32_t get_nbest_readers(ECM_REQUEST *er) {
591        int32_t i;
592        for (i = 0; i < cfg.lb_nbest_readers_tab.n; i++) {
593                if (cfg.lb_nbest_readers_tab.caid[i] == er->caid)
594                        return cfg.lb_nbest_readers_tab.value[i];
595        }
596        return cfg.lb_nbest_readers;
597}
598
599static int32_t get_reopen_seconds(READER_STAT *stat)
600{
601        int32_t max = (INT_MAX / cfg.lb_reopen_seconds);
602        if (stat->fail_factor > max)
603                stat->fail_factor = max;
604        if (!stat->fail_factor)
605            return cfg.lb_reopen_seconds;
606        return stat->fail_factor * cfg.lb_reopen_seconds;
607}
608
609ushort get_betatunnel_caid_to(ushort caid) 
610{
611    if (caid == 0x1801) return 0x1722;
612    if (caid == 0x1833) return 0x1702;
613    if (caid == 0x1834) return 0x1722;
614    if (caid == 0x1835) return 0x1722;
615    return 0;
616}
617
618void convert_to_beta_int(ECM_REQUEST *er, uint16_t caid_to)
619{
620    unsigned char md5tmp[MD5_DIGEST_LENGTH];
621    convert_to_beta(er->client, er, caid_to);
622    // update ecmd5 for store ECM in cache
623    memcpy(er->ecmd5, MD5(er->ecm+13, er->l-13, md5tmp), CS_ECMSTORESIZE);
624}
625
626/**
627 * Gets best reader for caid/prid/srvid/ecmlen.
628 * Best reader is evaluated by lowest avg time but only if ecm_count > cfg.lb_min_ecmcount (5)
629 * Also the reader is asked if he is "available"
630 * returns ridx when found or -1 when not found
631 */
632int32_t get_best_reader(ECM_REQUEST *er)
633{
634    if (!cfg.lb_mode || cfg.lb_mode==LB_LOG_ONLY)
635        return 0;
636
637    LL_ITER it;
638    struct s_reader *rdr;
639
640#ifdef MODULE_CCCAM
641    //preferred card forwarding (CCcam client):
642    if (cfg.cc_forward_origin_card && er->origin_card) {
643   
644            struct cc_card *card = er->origin_card;
645           
646            it = ll_iter_create(er->matching_rdr);
647            while ((rdr=ll_iter_next(&it))) {
648                    if (card->origin_reader == rdr)
649                            break;             
650            }
651            if (rdr) {
652                    cs_debug_mask(D_TRACE, "loadbalancer: forward card: forced by card %d to reader %s", card->id, rdr->label);
653                    ll_clear(er->matching_rdr);
654                    ll_append(er->matching_rdr, rdr);
655                    return 1;
656            }
657    }
658#endif
659
660    uint32_t prid = get_prid(er->caid, er->prid);
661
662    //auto-betatunnel: The trick is: "let the loadbalancer decide"!
663    if (cfg.lb_auto_betatunnel && er->caid >> 8 == 0x18) { //nagra
664        ushort caid_to = get_betatunnel_caid_to(er->caid);
665        if (caid_to) {
666            int8_t needs_stats_nagra = 0, needs_stats_beta = 0;
667           
668            int32_t time_nagra = 0;
669            int32_t time_beta = 0;
670            int32_t weight;
671            int32_t time;
672           
673            READER_STAT *stat_nagra;
674            READER_STAT *stat_beta;
675           
676            //What is faster? nagra or beta?
677            it = ll_iter_create(er->matching_rdr);
678            while ((rdr=ll_iter_next(&it)) && !needs_stats_nagra && !needs_stats_beta) {
679                weight = rdr->lb_weight;
680                if (weight <= 0) weight = 1;
681               
682                stat_nagra = get_stat(rdr, er->caid, prid, er->srvid, er->l);
683                stat_beta = get_stat(rdr, caid_to, prid, er->srvid, er->l+10);
684               
685                if (stat_nagra && stat_nagra->rc == 0) {
686                    time = stat_nagra->time_avg*100/weight;
687                    if (!time_nagra || time < time_nagra)
688                        time_nagra = time;
689                }
690               
691                if (stat_beta && stat_beta->rc == 0) {
692                    time = stat_beta->time_avg*100/weight;
693                    if (!time_beta || time < time_beta)
694                        time_beta = time;
695                }
696               
697                //Uncomplete reader evaluation, we need more stats!
698                if (!stat_nagra)
699                    needs_stats_nagra = 1;
700                if (!stat_beta)
701                    needs_stats_beta = 1;
702            }
703           
704            //if we needs stats, we send 2 ecm requests: 18xx and 17xx:
705            if (needs_stats_nagra || needs_stats_beta) {
706                cs_debug_mask(D_TRACE, "loadbalancer-betatunnel %04X:%04X needs more statistics...", er->caid, caid_to);
707                if (needs_stats_beta)               
708                    convert_to_beta_int(er, caid_to);
709            }
710            else if (time_beta && (!time_nagra || time_beta <= time_nagra)) {
711                cs_debug_mask(D_TRACE, "loadbalancer-betatunnel %04X:%04X selected beta: n%dms > b%dms", er->caid, caid_to, time_nagra, time_beta);
712                convert_to_beta_int(er, caid_to);
713            }
714            else {
715                cs_debug_mask(D_TRACE, "loadbalancer-betatunnel %04X:%04X selected nagra: n%dms < b%dms", er->caid, caid_to, time_nagra, time_beta);
716            }
717            // else nagra is faster or no beta, so continue unmodified
718        }
719    }
720       
721    LLIST * result = ll_create();
722    LLIST * selected = ll_create();
723    LLIST * timeout_services = ll_create();
724   
725    struct timeb new_nulltime;
726    memset(&new_nulltime, 0, sizeof(new_nulltime));
727    time_t current_time = time(NULL);
728    int32_t current = -1;
729    READER_STAT *stat = NULL;
730    int32_t retrylimit = get_retrylimit(er);
731
732        int32_t new_stats = 0; 
733    int32_t nlocal_readers = 0;
734    int32_t nbest_readers = get_nbest_readers(er);
735    int32_t nfb_readers = cfg.lb_nfb_readers;
736    int32_t nreaders = cfg.lb_max_readers;
737    if (!nreaders) nreaders = -1;
738
739#ifdef WITH_DEBUG
740    if (cs_dblevel & 0x01) {
741        //loadbalancer debug output:
742        int32_t size = 1;
743        int32_t nr = 0;
744        it = ll_iter_create(er->matching_rdr);
745        while ((rdr=ll_iter_next(&it))) {
746            if (nr > 5) {
747                size+=20;
748                break;
749            }
750            size += strlen(rdr->label)+1;
751            nr++;
752        }
753        ll_iter_reset(&it);
754        char *rdrs = cs_malloc(&rdrs, size, 1);
755        char *rptr = rdrs;
756        *rptr = 0;
757        nr = 0;
758        while ((rdr=ll_iter_next(&it))) {
759            if (nr > 5) {
760                snprintf(rptr, size, "...(%d more)", ll_count(er->matching_rdr)-nr);
761                break;
762            }
763            snprintf(rptr, size, "%s ", rdr->label);
764            rptr = strend(rptr);
765            nr++;
766        }
767   
768        cs_debug_mask(D_TRACE, "loadbalancer: client %s for %04X&%06X/%04X/%02hX: n=%d valid readers: %s", 
769            username(er->client), er->caid, prid, er->srvid, er->l, ll_count(er->matching_rdr), rdrs);
770           
771        free(rdrs);
772    }
773#endif 
774
775    it = ll_iter_create(er->matching_rdr);
776    while ((rdr=ll_iter_next(&it)) && nreaders) {
777            struct s_client *cl = rdr->client;
778   
779            int32_t weight = rdr->lb_weight <= 0?100:rdr->lb_weight;
780               
781            stat = get_stat(rdr, er->caid, prid, er->srvid, er->l);
782            if (!stat) {
783                cs_debug_mask(D_TRACE, "loadbalancer: starting statistics for reader %s", rdr->label);
784                add_stat(rdr, er, 1, -1);
785                ll_append(result, rdr); //no statistics, this reader is active (now) but we need statistics first!
786                nreaders--;
787                new_stats = 1;
788                continue;
789            }
790           
791            if (stat->ecm_count < 0||(stat->ecm_count > cfg.lb_max_ecmcount && stat->time_avg > retrylimit)) {
792                cs_debug_mask(D_TRACE, "loadbalancer: max ecms (%d) reached by reader %s, resetting statistics", cfg.lb_max_ecmcount, rdr->label);
793                reset_stat(er->caid, prid, er->srvid, er->l);
794                ll_append(result, rdr); //max ecm reached, get new statistics
795                nreaders--;
796                continue;
797            }
798               
799            int32_t hassrvid = has_srvid(rdr->client, er) || has_ident(&rdr->ftab, er);
800           
801            if (stat->rc == 0 && stat->ecm_count < cfg.lb_min_ecmcount) {
802                cs_debug_mask(D_TRACE, "loadbalancer: reader %s needs more statistics", rdr->label);
803                ll_append(result, rdr); //need more statistics!
804                nreaders--;
805                new_stats = 1;
806                continue;
807            }
808           
809            //Reader can decode this service (rc==0) and has lb_min_ecmcount ecms:
810            if (stat->rc == 0 || hassrvid) {
811                if (cfg.preferlocalcards && !(rdr->typ & R_IS_NETWORK))
812                    nlocal_readers++; //Prefer local readers!
813
814                if (stat->rc >= 5)
815                    ll_prepend(timeout_services, rdr);
816                    //just add another reader if best reader is nonresponding but has services
817                   
818                switch (cfg.lb_mode) {
819                    default:
820                    case LB_NONE:
821                    case LB_LOG_ONLY:
822                        //cs_debug_mask(D_TRACE, "loadbalance disabled");
823                        ll_append(result, rdr);
824                        nreaders--;
825                        continue;
826                       
827                    case LB_FASTEST_READER_FIRST:
828                        current = stat->time_avg * 100 / weight;
829                        break;
830                       
831                    case LB_OLDEST_READER_FIRST:
832                        if (!rdr->lb_last.time)
833                            rdr->lb_last = nulltime;
834                        current = (1000*(rdr->lb_last.time-nulltime.time)+
835                            rdr->lb_last.millitm-nulltime.millitm);
836                        if (!new_nulltime.time || (1000*(rdr->lb_last.time-new_nulltime.time)+
837                            rdr->lb_last.millitm-new_nulltime.millitm) < 0)
838                            new_nulltime = rdr->lb_last;
839                        break;
840                       
841                    case LB_LOWEST_USAGELEVEL:
842                        current = rdr->lb_usagelevel * 100 / weight;
843                        break;
844                }
845#ifdef WEBIF
846                rdr->lbvalue = current;
847#endif
848                if (rdr->ph.c_available
849                        && !rdr->ph.c_available(rdr,
850                                AVAIL_CHECK_LOADBALANCE)) {
851                    current=current*2;
852                }
853               
854                if (cl && cl->pending)
855                    current=current*cl->pending;
856                   
857                if (current < 1)
858                    current=1;
859                ll_append(selected, crt_cur(rdr, current, stat->time_avg));
860        }
861    }
862
863    if (nlocal_readers > nbest_readers) { //if we have local readers, we prefer them!
864        nlocal_readers = nbest_readers;
865        nbest_readers = 0; 
866    }
867    else
868        nbest_readers = nbest_readers-nlocal_readers;
869
870    struct stat_value *stv;
871    it = ll_iter_create(selected);
872   
873    struct s_reader *best_rdr = NULL;
874    struct s_reader *best_rdri = NULL;
875    int32_t best_time = 0;
876    LL_NODE *fallback = NULL;
877
878    int32_t n=0;
879    while (nreaders) {
880        struct stat_value *best = NULL;
881
882        ll_iter_reset(&it);
883        while ((stv=ll_iter_next(&it))) {
884            if (nlocal_readers && (stv->rdr->typ & R_IS_NETWORK))
885                continue;
886                       
887            if (stv->value && (!best || stv->value < best->value))
888                best=stv;
889        }
890        if (!best)
891            break;
892   
893        n++;
894        best_rdri = best->rdr;
895        if (!best_rdr) {
896            best_rdr = best_rdri;
897            best_time = best->time;
898        }
899        best->value = 0;
900           
901        if (nlocal_readers) {//primary readers, local
902            if (!ll_contains(timeout_services, best_rdri))
903                nlocal_readers--;
904            ll_append(result, best_rdri);
905            nreaders--;
906            //OLDEST_READER:
907            cs_ftime(&best_rdri->lb_last);
908        }
909        else if (nbest_readers) {//primary readers, other
910            if (!ll_contains(timeout_services, best_rdri))
911                nbest_readers--;
912            ll_append(result, best_rdri);
913            nreaders--;
914            //OLDEST_READER:
915            cs_ftime(&best_rdri->lb_last);
916        }
917        else if (nfb_readers) { //fallbacks:
918            if (!ll_contains(timeout_services, best_rdri))
919                nfb_readers--;
920            LL_NODE *node = ll_append(result, best_rdri);
921            if (!fallback)
922                fallback = node;
923        }
924        else
925            break;
926    }
927    ll_destroy_data(selected);
928    ll_destroy(timeout_services);
929   
930    if (!new_stats && ll_count(result) < ll_count(er->matching_rdr)) {
931        if (!n) //no best reader found? reopen if we have ecm_count>0
932        {
933            cs_debug_mask(D_TRACE, "loadbalancer: NO MATCHING READER FOUND, reopen last valid:");
934            it = ll_iter_create(er->matching_rdr);
935            while ((rdr=ll_iter_next(&it))) {
936                stat = get_stat(rdr, er->caid, prid, er->srvid, er->l);
937                if (stat && stat->ecm_count>0 && stat->last_received+get_reopen_seconds(stat) < current_time) {
938                    if (!ll_contains(result, rdr) && nreaders) {
939                        ll_append(result, rdr);
940                        nreaders--;
941                    }
942                    n++;
943                    cs_debug_mask(D_TRACE, "loadbalancer: reopened reader %s", rdr->label);
944                }
945            }
946            cs_debug_mask(D_TRACE, "loadbalancer: reopened %d readers", n);
947        }
948
949        //algo for reopen other reader only if responsetime>retrylimit:
950        int32_t reopen = !best_rdr || (best_time && (best_time > retrylimit));
951        if (reopen) {
952#ifdef WITH_DEBUG
953            if (best_rdr)
954                cs_debug_mask(D_TRACE, "loadbalancer: reader %s reached retrylimit (%dms), reopening other readers", best_rdr->label, best_time);
955            else
956                cs_debug_mask(D_TRACE, "loadbalancer: no best reader found, reopening other readers"); 
957#endif 
958            it = ll_iter_create(er->matching_rdr);
959            while ((rdr=ll_iter_next(&it)) && nreaders) {
960                stat = get_stat(rdr, er->caid, prid, er->srvid, er->l); 
961
962                if (stat && stat->rc != 0) { //retrylimit reached:
963                    if (cfg.lb_reopen_mode || stat->last_received+get_reopen_seconds(stat) < current_time) { //Retrying reader every (900/conf) seconds
964                        stat->last_received = current_time;
965                        nreaders += ll_remove(result, rdr);
966                        ll_prepend(result, rdr);
967                        nreaders--;
968                        cs_debug_mask(D_TRACE, "loadbalancer: retrying reader %s (fail %d)", rdr->label, stat->fail_factor);
969                    }
970                }
971            }
972        }
973    }
974   
975    //Setting return values:
976    ll_destroy(er->matching_rdr);
977    er->matching_rdr = result;
978    er->fallback = fallback;
979       
980    if (new_nulltime.time)
981        nulltime = new_nulltime;
982
983#ifdef WITH_DEBUG
984    if (cs_dblevel & 0x01) {
985        //loadbalancer debug output:
986        int32_t size = 3;
987        int32_t nr = 0;
988        it = ll_iter_create(result);
989        while ((rdr=ll_iter_next(&it))) {
990            if (nr > 5) { 
991                size+=20;
992                break;
993            }
994            size += strlen(rdr->label)+1;
995            nr++;
996        }
997        ll_iter_reset(&it);
998        char *rdrs = cs_malloc(&rdrs, size, 1);
999        char *rptr = rdrs;
1000        *rptr = 0;
1001        nr = 0;
1002        while ((rdr=ll_iter_next(&it))) {
1003            if (fallback && it.cur == fallback) {
1004                snprintf(rptr, size, "[");
1005                rptr = strend(rptr);
1006            }
1007            if (nr > 5) {
1008                snprintf(rptr, size, "...(%d more)", ll_count(result)-nr);
1009                rptr = strend(rptr);
1010                break;
1011            }
1012            snprintf(rptr, size, "%s ", rdr->label);
1013            rptr = strend(rptr);
1014
1015            nr++;
1016        }
1017        if (fallback) {
1018            rptr--;
1019            *rptr=']';
1020        }
1021   
1022        cs_debug_mask(D_TRACE, "loadbalancer: client %s for %04X&%06X/%04X:%02hX: n=%d selected readers: %s", 
1023            username(er->client), er->caid, prid, er->srvid, er->l, ll_count(result), rdrs);
1024       
1025        free(rdrs);
1026    }
1027#endif 
1028
1029       
1030    return 1;
1031}
1032
1033/**
1034 * clears statistic of reader ridx.
1035 **/
1036void clear_reader_stat(struct s_reader *rdr)
1037{
1038    if (!rdr->lb_stat) 
1039        return;
1040
1041    ll_clear_data(rdr->lb_stat);
1042}
1043
1044void clear_all_stat()
1045{
1046    struct s_reader *rdr;
1047    LL_ITER itr = ll_iter_create(configured_readers);
1048    while ((rdr = ll_iter_next(&itr))) { 
1049        clear_reader_stat(rdr);
1050    }
1051}
1052
1053void housekeeping_stat_thread()
1054{   
1055    time_t cleanup_time = time(NULL) - (cfg.lb_stat_cleanup*60*60);
1056    int32_t cleaned = 0;
1057    struct s_reader *rdr;
1058    LL_ITER itr = ll_iter_create(configured_readers);
1059    while ((rdr = ll_iter_next(&itr))) {
1060        if (rdr->lb_stat) {
1061            LL_ITER it = ll_iter_create(rdr->lb_stat);
1062            READER_STAT *stat;
1063            while ((stat=ll_iter_next(&it))) {
1064               
1065                if (stat->last_received < cleanup_time) {
1066                    ll_iter_remove_data(&it);
1067                    cleaned++;
1068                }
1069            }
1070        }
1071    }
1072    cs_debug_mask(D_TRACE, "loadbalancer cleanup: removed %d entries", cleaned);
1073}
1074
1075void housekeeping_stat(int32_t force)
1076{
1077    time_t now = time(NULL);
1078    if (!force && now/60/60 == last_housekeeping/60/60) //only clean once in an hour
1079        return;
1080   
1081    last_housekeeping = now;
1082    start_thread((void*)&housekeeping_stat_thread, "housekeeping lb stats");
1083}
1084
1085static int compare_stat(READER_STAT **ps1, READER_STAT **ps2) {
1086    READER_STAT *s1 = (*ps1), *s2 = (*ps2);
1087    int res = s1->rc - s2->rc;
1088    if (res) return res;
1089    res = s1->caid - s2->caid;
1090    if (res) return res;
1091    res = s1->prid - s2->prid;
1092    if (res) return res;
1093    res = s1->srvid - s2->srvid;
1094    if (res) return res;
1095    res = s1->ecmlen - s2->ecmlen;
1096    if (res) return res;
1097    res = s1->last_received - s2->last_received;
1098    return res; 
1099}
1100
1101static int compare_stat_r(READER_STAT **ps1, READER_STAT **ps2) {
1102    return -compare_stat(ps1, ps2);
1103}
1104
1105void sort_stat(struct s_reader *rdr, int32_t reverse)
1106{
1107    if (reverse)
1108        ll_sort(rdr->lb_stat, compare_stat_r);
1109    else
1110        ll_sort(rdr->lb_stat, compare_stat);
1111}
1112
1113#endif
Note: See TracBrowser for help on using the repository browser.