OVS CT连接追踪实现NAT

时间:2025-03-25 18:21:32
// 根据add-flow操作匹配type类型CT OFPACT(CT, ofpact_conntrack, ofpact, "ct") // 读取action里CT参数 static char * OVS_WARN_UNUSED_RESULT parse_CT(char *arg, const struct ofpact_parse_params *pp) { const size_t ct_offset = ofpacts_pull(pp->ofpacts); struct ofpact_conntrack *oc; char *error = NULL; char *key, *value; oc = ofpact_put_CT(pp->ofpacts); oc->flags = 0; oc->recirc_table = NX_CT_RECIRC_NONE; while (ofputil_parse_key_value(&arg, &key, &value)) { if (!strcmp(key, "commit")) { oc->flags |= NX_CT_F_COMMIT; } else if (!strcmp(key, "force")) { oc->flags |= NX_CT_F_FORCE; } else if (!strcmp(key, "table")) { if (!ofputil_table_from_string(value, pp->table_map, &oc->recirc_table)) { error = xasprintf("unknown table %s", value); } else if (oc->recirc_table == NX_CT_RECIRC_NONE) { error = xasprintf("invalid table %#"PRIx8, oc->recirc_table); } } else if (!strcmp(key, "zone")) { error = str_to_u16(value, "zone", &oc->zone_imm); if (error) { free(error); error = mf_parse_subfield(&oc->zone_src, value); if (error) { return error; } } } else if (!strcmp(key, "alg")) { error = str_to_connhelper(value, &oc->alg); } else if (!strcmp(key, "nat")) { const size_t nat_offset = ofpacts_pull(pp->ofpacts); error = parse_NAT(value, pp); /* Update CT action pointer and length. */ pp->ofpacts->header = ofpbuf_push_uninit(pp->ofpacts, nat_offset); oc = pp->ofpacts->header; } else if (!strcmp(key, "exec")) { /* Hide existing actions from ofpacts_parse_copy(), so the * nesting can be handled transparently. */ enum ofputil_protocol usable_protocols2; const size_t exec_offset = ofpacts_pull(pp->ofpacts); /* Initializes 'usable_protocol2', fold it back to * '*usable_protocols' afterwards, so that we do not lose * restrictions already in there. */ struct ofpact_parse_params pp2 = *pp; pp2.usable_protocols = &usable_protocols2; error = ofpacts_parse_copy(value, &pp2, false, OFPACT_CT); *pp->usable_protocols &= usable_protocols2; pp->ofpacts->header = ofpbuf_push_uninit(pp->ofpacts, exec_offset); oc = pp->ofpacts->header; } else { error = xasprintf("invalid argument to \"ct\" action: `%s'", key); } } if (!error && oc->flags & NX_CT_F_FORCE && !(oc->flags & NX_CT_F_COMMIT)) { error = xasprintf("\"force\" flag requires \"commit\" flag."); } if (ofpbuf_oversized(pp->ofpacts)) { free(error); return xasprintf("input too big"); } ofpact_finish_CT(pp->ofpacts, &oc); ofpbuf_push_uninit(pp->ofpacts, ct_offset); return error; } // FAST PATH:收到packet时走内核态查询flow,接上一篇ovs_execute_actions int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb, const struct sw_flow_actions *acts, struct sw_flow_key *key) { int err, level; level = __this_cpu_inc_return(exec_actions_level); if (unlikely(level > OVS_RECURSION_LIMIT)) { net_crit_ratelimited("ovs: recursion limit reached on datapath %s, probable configuration error\n", ovs_dp_name(dp)); kfree_skb(skb); err = -ENETDOWN; goto out; } OVS_CB(skb)->acts_origlen = acts->orig_len; // 执行action err = do_execute_actions(dp, skb, key, acts->actions, acts->actions_len); if (level == 1) process_deferred_actions(dp); out: __this_cpu_dec(exec_actions_level); return err; } /* Execute a list of actions against 'skb'. */ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb, struct sw_flow_key *key, const struct nlattr *attr, int len) { const struct nlattr *a; int rem; for (a = attr, rem = len; rem > 0; a = nla_next(a, &rem)) { int err = 0; switch (nla_type(a)) { ...... case OVS_ACTION_ATTR_CT: if (!is_flow_key_valid(key)) { err = ovs_flow_key_update(skb, key); if (err) return err; } err = ovs_ct_execute(ovs_dp_get_net(dp), skb, key, nla_data(a)); /* Hide stolen IP fragments from user space. */ if (err) return err == -EINPROGRESS ? 0 : err; break; ...... } ...... } /* Returns 0 on success, -EINPROGRESS if 'skb' is stolen, or other nonzero * value if 'skb' is freed. */ int ovs_ct_execute(struct net *net, struct sk_buff *skb, struct sw_flow_key *key, const struct ovs_conntrack_info *info) { int nh_ofs; int err; /* The conntrack module expects to be working at L3. */ nh_ofs = skb_network_offset(skb); skb_pull_rcsum(skb, nh_ofs); err = ovs_skb_network_trim(skb); if (err) return err; if (key->ip.frag != OVS_FRAG_TYPE_NONE) { err = handle_fragments(net, key, info->zone.id, skb); if (err) return err; } if (info->commit) // ct(commit,table=1) err = ovs_ct_commit(net, key, info, skb); else // ct(table=1) err = ovs_ct_lookup(net, key, info, skb); skb_push(skb, nh_ofs); skb_postpush_rcsum(skb, skb->data, nh_ofs); if (err) kfree_skb(skb); return err; } // 设置ct_label,ct_mark,nat,commit参数 static int ovs_ct_commit(struct net *net, struct sw_flow_key *key, const struct ovs_conntrack_info *info, struct sk_buff *skb) { enum ip_conntrack_info ctinfo; struct nf_conn *ct; int err; // 设置nat err = __ovs_ct_lookup(net, key, info, skb); if (err) return err; /* The connection could be invalid, in which case this is a no-op.*/ ct = nf_ct_get(skb, &ctinfo); if (!ct) return 0; ...... // 设置ct_mark if (info->mark.mask) { err = ovs_ct_set_mark(ct, key, info->mark.value, info->mark.mask); if (err) return err; } // 设置ct_label if (!nf_ct_is_confirmed(ct)) { err = ovs_ct_init_labels(ct, key, &info->labels.value, &info->labels.mask); if (err) return err; } else if (IS_ENABLED(CONFIG_NF_CONNTRACK_LABELS) && labels_nonzero(&info->labels.mask)) { err = ovs_ct_set_labels(ct, key, &info->labels.value, &info->labels.mask); if (err) return err; } /* This will take care of sending queued events even if the connection * is already confirmed. */ if (nf_conntrack_confirm(skb) != NF_ACCEPT) return -EINVAL; return 0; } static int __ovs_ct_lookup(struct net *net, struct sw_flow_key *key, const struct ovs_conntrack_info *info, struct sk_buff *skb) { bool cached = skb_nfct_cached(net, key, info, skb); enum ip_conntrack_info ctinfo; struct nf_conn *ct; if (!cached) { struct nf_hook_state state = { .hook = NF_INET_PRE_ROUTING, .pf = info->family, .net = net, }; struct nf_conn *tmpl = info->ct; int err; /* Associate skb with specified zone. */ if (tmpl) { if (skb_nfct(skb)) nf_conntrack_put(skb_nfct(skb)); nf_conntrack_get(&tmpl->ct_general); nf_ct_set(skb, tmpl, IP_CT_NEW); } err = nf_conntrack_in(skb, &state); if (err != NF_ACCEPT) return -ENOENT; /* Clear CT state NAT flags to mark that we have not yet done * NAT after the nf_conntrack_in() call. We can actually clear * the whole state, as it will be re-initialized below. */ key->ct_state = 0; /* Update the key, but keep the NAT flags. */ ovs_ct_update_key(skb, info, key, true, true); } ct = nf_ct_get(skb, &ctinfo); if (ct) { bool add_helper = false; // 设置ct(nat) if (info->nat && !(key->ct_state & OVS_CS_F_NAT_MASK) && (nf_ct_is_confirmed(ct) || info->commit) && ovs_ct_nat(net, key, info, skb, ct, ctinfo) != NF_ACCEPT) { return -EINVAL; } ...... } static int ovs_ct_nat(struct net *net, struct sw_flow_key *key, const struct ovs_conntrack_info *info, struct sk_buff *skb, struct nf_conn *ct, enum ip_conntrack_info ctinfo) { enum nf_nat_manip_type maniptype; int err; #ifdef HAVE_NF_CT_IS_UNTRACKED if (nf_ct_is_untracked(ct)) { /* A NAT action may only be performed on tracked packets. */ return NF_ACCEPT; } #endif /* HAVE_NF_CT_IS_UNTRACKED */ /* Add NAT extension if not confirmed yet. */ if (!nf_ct_is_confirmed(ct) && !nf_ct_nat_ext_add(ct)) return NF_ACCEPT; /* Can't NAT. */ /* Determine NAT type. * Check if the NAT type can be deduced from the tracked connection. * Make sure new expected connections (IP_CT_RELATED) are NATted only * when committing. */ if (info->nat & OVS_CT_NAT && ctinfo != IP_CT_NEW && ct->status & IPS_NAT_MASK && (ctinfo != IP_CT_RELATED || info->commit)) { /* NAT an established or related connection like before. */ if (CTINFO2DIR(ctinfo) == IP_CT_DIR_REPLY) /* This is the REPLY direction for a connection * for which NAT was applied in the forward * direction. Do the reverse NAT. */ maniptype = ct->status & IPS_SRC_NAT ? NF_NAT_MANIP_DST : NF_NAT_MANIP_SRC; else maniptype = ct->status & IPS_SRC_NAT ? NF_NAT_MANIP_SRC : NF_NAT_MANIP_DST; } else if (info->nat & OVS_CT_SRC_NAT) { maniptype = NF_NAT_MANIP_SRC; } else if (info->nat & OVS_CT_DST_NAT) { maniptype = NF_NAT_MANIP_DST; } else { return NF_ACCEPT; /* Connection is not NATed. */ } // 执行nat相关action err = ovs_ct_nat_execute(skb, ct, ctinfo, &info->range, maniptype); if (err == NF_ACCEPT && ct->status & IPS_DST_NAT) { if (ct->status & IPS_SRC_NAT) { if (maniptype == NF_NAT_MANIP_SRC) maniptype = NF_NAT_MANIP_DST; else maniptype = NF_NAT_MANIP_SRC; err = ovs_ct_nat_execute(skb, ct, ctinfo, &info->range, maniptype); } else if (CTINFO2DIR(ctinfo) == IP_CT_DIR_ORIGINAL) { err = ovs_ct_nat_execute(skb, ct, ctinfo, NULL, NF_NAT_MANIP_SRC); } } /* Mark NAT done if successful and update the flow key. */ if (err == NF_ACCEPT) ovs_nat_update_key(key, skb, maniptype); return err; } static int ovs_ct_nat_execute(struct sk_buff *skb, struct nf_conn *ct, enum ip_conntrack_info ctinfo, const struct nf_nat_range2 *range, enum nf_nat_manip_type maniptype) { int hooknum, nh_off, err = NF_ACCEPT; nh_off = skb_network_offset(skb); skb_pull_rcsum(skb, nh_off); /* See HOOK2MANIP(). */ if (maniptype == NF_NAT_MANIP_SRC) hooknum = NF_INET_LOCAL_IN; /* Source NAT */ else hooknum = NF_INET_LOCAL_OUT; /* Destination NAT */ switch (ctinfo) { case IP_CT_RELATED: case IP_CT_RELATED_REPLY: if (IS_ENABLED(CONFIG_NF_NAT_IPV4) && skb->protocol == htons(ETH_P_IP) && ip_hdr(skb)->protocol == IPPROTO_ICMP) { if (!nf_nat_icmp_reply_translation(skb, ct, ctinfo, hooknum)) err = NF_DROP; goto push; } else if (IS_ENABLED(CONFIG_NF_NAT_IPV6) && skb->protocol == htons(ETH_P_IPV6)) { __be16 frag_off; u8 nexthdr = ipv6_hdr(skb)->nexthdr; int hdrlen = ipv6_skip_exthdr(skb, sizeof(struct ipv6hdr), &nexthdr, &frag_off); if (hdrlen >= 0 && nexthdr == IPPROTO_ICMPV6) { if (!nf_nat_icmpv6_reply_translation(skb, ct, ctinfo, hooknum, hdrlen)) err = NF_DROP; goto push; } } /* Non-ICMP, fall thru to initialize if needed. */ /* fall through */ case IP_CT_NEW: /* Seen it before? This can happen for loopback, retrans, * or local packets. */ if (!nf_nat_initialized(ct, maniptype)) { /* Initialize according to the NAT action. */ err = (range && range->flags & NF_NAT_RANGE_MAP_IPS) /* Action is set up to establish a new * mapping. */ ? nf_nat_setup_info(ct, range, maniptype) : nf_nat_alloc_null_binding(ct, hooknum); if (err != NF_ACCEPT) goto push; } break; case IP_CT_ESTABLISHED: case IP_CT_ESTABLISHED_REPLY: break; default: err = NF_DROP; goto push; } err = nf_nat_packet(ct, ctinfo, hooknum, skb); push: skb_push(skb, nh_off); skb_postpush_rcsum(skb, skb->data, nh_off); return err; } static int ovs_ct_lookup(struct net *net, struct sw_flow_key *key, const struct ovs_conntrack_info *info, struct sk_buff *skb) { struct nf_conntrack_expect *exp; exp = ovs_ct_expect_find(net, &info->zone, info->family, skb); if (exp) { u8 state; // 未找到,将ct_state更新为+trk、+new、+rel state = OVS_CS_F_TRACKED | OVS_CS_F_NEW | OVS_CS_F_RELATED; __ovs_ct_update_key(key, state, &info->zone, exp->master); } else { struct nf_conn *ct; int err; err = __ovs_ct_lookup(net, key, info, skb); if (err) return err; ct = (struct nf_conn *)skb_nfct(skb); if (ct) nf_ct_deliver_cached_events(ct); } return 0; } // Slow Path:在内核态未查找到调用upcall去用户态查询 static void *udpif_upcall_handler(void *arg) { struct handler *handler = arg; struct udpif *udpif = handler->udpif; while (!latch_is_set(&handler->udpif->exit_latch)) { if (recv_upcalls(handler)) { poll_immediate_wake(); } else { dpif_recv_wait(udpif->dpif, handler->handler_id); latch_wait(&udpif->exit_latch); } poll_block(); } return NULL; } // 2、调用用户态upcalls static size_t recv_upcalls(struct handler *handler) { struct udpif *udpif = handler->udpif; uint64_t recv_stubs[UPCALL_MAX_BATCH][512 / 8]; struct ofpbuf recv_bufs[UPCALL_MAX_BATCH]; struct dpif_upcall dupcalls[UPCALL_MAX_BATCH]; struct upcall upcalls[UPCALL_MAX_BATCH]; struct flow flows[UPCALL_MAX_BATCH]; size_t n_upcalls, i; n_upcalls = 0; while (n_upcalls < UPCALL_MAX_BATCH) { struct ofpbuf *recv_buf = &recv_bufs[n_upcalls]; struct dpif_upcall *dupcall = &dupcalls[n_upcalls]; struct upcall *upcall = &upcalls[n_upcalls]; struct flow *flow = &flows[n_upcalls]; unsigned int mru = 0; uint64_t hash = 0; int error; ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls], sizeof recv_stubs[n_upcalls]); // 2.1、接受upcall请求 if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) { ofpbuf_uninit(recv_buf); break; } upcall->fitness = odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow, NULL); if (upcall->fitness == ODP_FIT_ERROR) { goto free_dupcall; } if (dupcall->mru) { mru = nl_attr_get_u16(dupcall->mru); } if (dupcall->hash) { hash = nl_attr_get_u64(dupcall->hash); } // 2.2收到内核上传的数据分类 error = upcall_receive(upcall, udpif->backer, &dupcall->packet, dupcall->type, dupcall->userdata, flow, mru, &dupcall->ufid, PMD_ID_NULL); if (error) { if (error == ENODEV) { /* Received packet on datapath port for which we couldn't * associate an ofproto. This can happen if a port is removed * while traffic is being received. Print a rate-limited * message in case it happens frequently. */ dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key, dupcall->key_len, NULL, 0, NULL, 0, &dupcall->ufid, PMD_ID_NULL, NULL); VLOG_INFO_RL(&rl, "received packet on unassociated datapath " "port %"PRIu32, flow->in_port.odp_port); } goto free_dupcall; } upcall->key = dupcall->key; upcall->key_len = dupcall->key_len; upcall->ufid = &dupcall->ufid; upcall->hash = hash; upcall->out_tun_key = dupcall->out_tun_key; upcall->actions = dupcall->actions; pkt_metadata_from_flow(&dupcall->packet.md, flow); // 2.3、提取flow flow_extract(&dupcall->packet, flow); // 2.4、处理upcall数据 error = process_upcall(udpif, upcall, &upcall->odp_actions, &upcall->wc); if (error) { goto cleanup; } n_upcalls++; continue; } // 2.4调用upcall_xlate static int process_upcall(struct udpif *udpif, struct upcall *upcall, struct ofpbuf *odp_actions, struct flow_wildcards *wc) { const struct dp_packet *packet = upcall->packet; const struct flow *flow = upcall->flow; size_t actions_len = 0; switch (upcall->type) { case MISS_UPCALL: case SLOW_PATH_UPCALL: // 2.4.1 处理upcall类型 upcall_xlate(udpif, upcall, odp_actions, wc); return 0; } } // 2.4.1 处理upcall类型 static void upcall_xlate(struct udpif *udpif, struct upcall *upcall, struct ofpbuf *odp_actions, struct flow_wildcards *wc) { struct dpif_flow_stats stats; enum xlate_error xerr; struct xlate_in xin; struct ds output; stats.n_packets = 1; stats.n_bytes = dp_packet_size(upcall->packet); stats.used = time_msec(); stats.tcp_flags = ntohs(upcall->flow->tcp_flags); xlate_in_init(&xin, upcall->ofproto, ofproto_dpif_get_tables_version(upcall->ofproto), upcall->flow, upcall->ofp_in_port, NULL, stats.tcp_flags, upcall->packet, wc, odp_actions); upcall->reval_seq = seq_read(udpif->reval_seq); // 2.4.1.1流表一个个遍历匹配 xerr = xlate_actions(&xin, &upcall->xout); /* Translate again and log the ofproto trace for * these two error types. */ /* This function is also called for slow-pathed flows. As we are only * going to create new datapath flows for actual datapath misses, there is * no point in creating a ukey otherwise. */ if (upcall->type == MISS_UPCALL) { upcall->ukey = ukey_create_from_upcall(upcall, wc); } } // 2.4.1.1匹配流表 enum xlate_error xlate_actions(struct xlate_in *xin, struct xlate_out *xout) { *xout = (struct xlate_out) { .slow = 0, .recircs = RECIRC_REFS_EMPTY_INITIALIZER, }; if (!xin->ofpacts && !ctx.rule) { // 2.4.1.1流表一个个遍历匹配 ctx.rule = rule_dpif_lookup_from_table( ctx.xbridge->ofproto, ctx.xin->tables_version, flow, ctx.wc, ctx.xin->resubmit_stats, &ctx.table_id, flow->in_port.ofp_port, true, true, ctx.xin->xcache); } // 2.4.1.2执行action do_xlate_actions(ofpacts, ofpacts_len, &ctx, true, false); } // 2.4.1.2 执行ct模块action static void do_xlate_actions(const struct ofpact *ofpacts, size_t ofpacts_len, struct xlate_ctx *ctx, bool is_last_action, bool group_bucket_action) { ...... case OFPACT_CT: compose_conntrack_action(ctx, ofpact_get_CT(a), last); break; case OFPACT_CT_CLEAR: if (ctx->conntracked) { compose_ct_clear_action(ctx); } break; case OFPACT_NAT: /* This will be processed by compose_conntrack_action(). */ ctx->ct_nat_action = ofpact_get_NAT(a); break; } static void compose_conntrack_action(struct xlate_ctx *ctx, struct ofpact_conntrack *ofc, bool is_last_action) { /* Ensure that any prior actions are applied before composing the new * conntrack action. */ xlate_commit_actions(ctx); /* Process nested actions first, to populate the key. */ ctx->ct_nat_action = NULL; ctx->wc->masks.ct_mark = 0; ctx->wc->masks.ct_label = OVS_U128_ZERO; do_xlate_actions(ofc->actions, ofpact_ct_get_action_len(ofc), ctx, is_last_action, false); ct_offset = nl_msg_start_nested(ctx->odp_actions, OVS_ACTION_ATTR_CT); if (ofc->flags & NX_CT_F_COMMIT) { nl_msg_put_flag(ctx->odp_actions, ofc->flags & NX_CT_F_FORCE ? OVS_CT_ATTR_FORCE_COMMIT : OVS_CT_ATTR_COMMIT); if (ctx->xbridge->support.ct_eventmask) { nl_msg_put_u32(ctx->odp_actions, OVS_CT_ATTR_EVENTMASK, OVS_CT_EVENTMASK_DEFAULT); } if (ctx->xbridge->support.ct_timeout) { put_ct_timeout(ctx->odp_actions, ctx->xbridge->ofproto->backer, &ctx->xin->flow, ctx->wc, zone); } } nl_msg_put_u16(ctx->odp_actions, OVS_CT_ATTR_ZONE, zone); put_ct_mark(&ctx->xin->flow, ctx->odp_actions, ctx->wc); put_ct_label(&ctx->xin->flow, ctx->odp_actions, ctx->wc); put_ct_helper(ctx, ctx->odp_actions, ofc); put_ct_nat(ctx); nl_msg_end_nested(ctx->odp_actions, ct_offset); ctx->wc->masks.ct_mark = old_ct_mark_mask; ctx->wc->masks.ct_label = old_ct_label_mask; if (ofc->recirc_table != NX_CT_RECIRC_NONE) { ctx->conntracked = true; // 若在用户态匹配到flow后会在内核datapath生成一条规则 compose_recirculate_and_fork(ctx, ofc->recirc_table, zone); } ctx->ct_nat_action = NULL; /* The ct_* fields are only available in the scope of the 'recirc_table' * call chain. */ flow_clear_conntrack(&ctx->xin->flow); xlate_report(ctx, OFT_DETAIL, "Sets the packet to an untracked state, " "and clears all the conntrack fields."); ctx->conntracked = false; }